refactor: split repartition region descriptors (#8172)

* refactor: split repartition region descriptors

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): support default source repartition planning

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): support default source repartition metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-26 20:54:54 +08:00
committed by GitHub
parent 0675cffe68
commit 5943b41067
14 changed files with 894 additions and 191 deletions

View File

@@ -853,7 +853,7 @@ mod tests {
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::dispatch::Dispatch;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::repartition_end::RepartitionEnd;
use crate::procedure::repartition::test_util::{
TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
@@ -864,16 +864,16 @@ mod tests {
fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
RepartitionPlanEntry {
group_id: uuid::Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}],
source_regions: vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
)],
target_regions: vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
@@ -1209,16 +1209,16 @@ mod tests {
);
let succeeded_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
}],
source_regions: vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 2),
range_expr("x", 100, 200),
)],
target_regions: vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 150),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 150, 200),
},
@@ -1292,16 +1292,16 @@ mod tests {
);
let succeeded_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
}],
source_regions: vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 2),
range_expr("x", 100, 200),
)],
target_regions: vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 150),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 150, 200),
},
@@ -1567,16 +1567,16 @@ mod tests {
let failed_merge_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
),
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 2),
range_expr("x", 100, 200),
),
],
target_regions: vec![RegionDescriptor {
target_regions: vec![TargetRegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}],
@@ -1587,16 +1587,16 @@ mod tests {
};
let succeeded_split_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 200, 300),
}],
source_regions: vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 3),
range_expr("x", 200, 300),
)],
target_regions: vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 200, 250),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 250, 300),
},

View File

@@ -35,7 +35,7 @@ use tokio::time::Instant;
use crate::error::{self, Result};
use crate::procedure::repartition::dispatch::Dispatch;
use crate::procedure::repartition::plan::{
AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
AllocationPlanEntry, RepartitionPlanEntry, TargetRegionDescriptor,
convert_allocation_plan_to_repartition_plan,
};
use crate::procedure::repartition::{Context, State};
@@ -324,7 +324,7 @@ impl AllocateRegion {
/// Collects all regions that need to be allocated from the repartition plan entries.
fn collect_allocate_regions(
repartition_plan_entries: &[RepartitionPlanEntry],
) -> Vec<&RegionDescriptor> {
) -> Vec<&TargetRegionDescriptor> {
repartition_plan_entries
.iter()
.flat_map(|p| p.allocate_regions())
@@ -333,7 +333,7 @@ impl AllocateRegion {
/// Prepares region allocation data: region numbers and their partition expressions.
fn prepare_region_allocation_data(
allocate_regions: &[&RegionDescriptor],
allocate_regions: &[&TargetRegionDescriptor],
) -> Result<Vec<(RegionNumber, String)>> {
allocate_regions
.iter()
@@ -417,6 +417,7 @@ mod tests {
use super::*;
use crate::procedure::repartition::State;
use crate::procedure::repartition::plan::SourceRegionDescriptor;
use crate::procedure::repartition::test_util::{
TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
test_region_wal_options,
@@ -428,8 +429,21 @@ mod tests {
col: &str,
start: i64,
end: i64,
) -> RegionDescriptor {
RegionDescriptor {
) -> SourceRegionDescriptor {
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, region_number),
range_expr(col, start, end),
)
}
fn create_target_region_descriptor(
table_id: TableId,
region_number: u32,
col: &str,
start: i64,
end: i64,
) -> TargetRegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, region_number),
partition_expr: range_expr(col, start, end),
}
@@ -700,10 +714,10 @@ mod tests {
fn test_prepare_region_allocation_data() {
let table_id = 1024;
let regions = [
create_region_descriptor(table_id, 10, "x", 0, 50),
create_region_descriptor(table_id, 11, "x", 50, 100),
create_target_region_descriptor(table_id, 10, "x", 0, 50),
create_target_region_descriptor(table_id, 11, "x", 50, 100),
];
let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
let region_refs: Vec<&TargetRegionDescriptor> = regions.iter().collect();
let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
@@ -732,7 +746,7 @@ mod tests {
ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![],
target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)],
target_regions: vec![create_target_region_descriptor(table_id, 3, "x", 0, 100)],
allocated_region_ids: vec![RegionId::new(table_id, 3)],
pending_deallocate_region_ids: vec![],
transition_map: vec![],

View File

@@ -25,22 +25,22 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
use crate::procedure::repartition::group::RepartitionGroupProcedure;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
pub(crate) fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
source_regions: &[SourceRegionDescriptor],
target_regions: &[TargetRegionDescriptor],
transition_map: &[Vec<usize>],
) -> HashMap<RegionId, Vec<RegionId>> {
transition_map
.iter()
.enumerate()
.map(|(source_idx, indices)| {
let source_region = source_regions[source_idx].region_id;
let source_region = source_regions[source_idx].region_id();
let target_regions = indices
.iter()
.map(|&target_idx| target_regions[target_idx].region_id)

View File

@@ -49,7 +49,7 @@ use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::utils::get_datanode_table_value;
use crate::procedure::repartition::{self};
use crate::service::mailbox::MailboxRef;
@@ -330,9 +330,9 @@ pub struct PersistentContext {
/// The schema name of the repartition group.
pub schema_name: String,
/// The source regions of the repartition group.
pub sources: Vec<RegionDescriptor>,
pub sources: Vec<SourceRegionDescriptor>,
/// The target regions of the repartition group.
pub targets: Vec<RegionDescriptor>,
pub targets: Vec<TargetRegionDescriptor>,
/// For each `source region`, the corresponding
/// `target regions` that overlap with it.
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
@@ -360,8 +360,8 @@ impl PersistentContext {
table_id: TableId,
catalog_name: String,
schema_name: String,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
sources: Vec<SourceRegionDescriptor>,
targets: Vec<TargetRegionDescriptor>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
sync_region: bool,
allocated_region_ids: Vec<RegionId>,
@@ -392,7 +392,7 @@ impl PersistentContext {
SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
]);
for source in &self.sources {
lock_keys.push(RegionLock::Write(source.region_id).into());
lock_keys.push(RegionLock::Write(source.region_id()).into());
}
lock_keys
}

View File

@@ -37,7 +37,7 @@ use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::TargetRegionDescriptor;
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
@@ -75,7 +75,7 @@ impl ApplyStagingManifest {
fn build_apply_staging_manifest_instructions(
staging_manifest_paths: &HashMap<RegionId, String>,
target_routes: &[RegionRoute],
targets: &[RegionDescriptor],
targets: &[TargetRegionDescriptor],
central_region_id: RegionId,
) -> Result<ApplyStagingManifestInstructions> {
let target_partition_expr_by_region = targets

View File

@@ -38,7 +38,7 @@ use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::TargetRegionDescriptor;
use crate::procedure::utils::{self, ErrorStrategy};
use crate::service::mailbox::{Channel, MailboxRef};
@@ -77,7 +77,7 @@ impl EnterStagingRegion {
fn build_enter_staging_instructions(
group_id: GroupId,
prepare_result: &GroupPrepareResult,
targets: &[RegionDescriptor],
targets: &[TargetRegionDescriptor],
pending_deallocate_region_ids: &[RegionId],
) -> Result<HashMap<Peer, Vec<common_meta::instruction::EnterStagingRegion>>> {
let target_partition_expr_by_region = targets
@@ -454,7 +454,7 @@ mod tests {
use crate::error::{self, Error};
use crate::procedure::repartition::group::GroupPrepareResult;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::TargetRegionDescriptor;
use crate::procedure::repartition::test_util::{
TestingEnv, new_persistent_context, range_expr,
};
@@ -720,13 +720,13 @@ mod tests {
}
}
fn test_targets() -> Vec<RegionDescriptor> {
fn test_targets() -> Vec<TargetRegionDescriptor> {
vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 10, 20),
},

View File

@@ -30,7 +30,7 @@ use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
@@ -98,8 +98,8 @@ impl State for RemapManifest {
impl RemapManifest {
fn build_remap_manifest_instructions(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
source_regions: &[SourceRegionDescriptor],
target_regions: &[TargetRegionDescriptor],
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
central_region_id: RegionId,
) -> Result<common_meta::instruction::RemapManifest> {
@@ -117,7 +117,7 @@ impl RemapManifest {
Ok(common_meta::instruction::RemapManifest {
region_id: central_region_id,
input_regions: source_regions.iter().map(|r| r.region_id).collect(),
input_regions: source_regions.iter().map(|r| r.region_id()).collect(),
region_mapping: region_mapping.clone(),
new_partition_exprs,
})

View File

@@ -19,7 +19,7 @@ use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use snafu::{OptionExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::sync_region::SyncRegion;
@@ -27,21 +27,18 @@ use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
#[derive(Debug, Serialize, Deserialize)]
pub struct RepartitionStart;
/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
fn ensure_region_route_expr_match(
/// Ensures that the partition expression of the source region route matches the source descriptor.
fn ensure_source_region_route_expr_match(
region_route: &RegionRoute,
region_descriptor: &RegionDescriptor,
source: &SourceRegionDescriptor,
) -> Result<RegionRoute> {
let actual = region_route.region.partition_expr();
let expected = region_descriptor
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
let expected = source.route_expr_for_rollback()?;
ensure!(
actual == expected,
error::PartitionExprMismatchSnafu {
@@ -60,8 +57,8 @@ impl RepartitionStart {
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
sources: &[SourceRegionDescriptor],
targets: &[TargetRegionDescriptor],
) -> Result<GroupPrepareResult> {
ensure!(
!sources.is_empty(),
@@ -78,12 +75,12 @@ impl RepartitionStart {
.iter()
.map(|s| {
region_routes_map
.get(&s.region_id)
.get(&s.region_id())
.context(error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: s.region_id,
region_id: s.region_id(),
})
.and_then(|r| ensure_region_route_expr_match(r, s))
.and_then(|r| ensure_source_region_route_expr_match(r, s))
})
.collect::<Result<Vec<_>>>()?;
let target_region_routes = targets
@@ -109,7 +106,7 @@ impl RepartitionStart {
}
);
}
let central_region = sources[0].region_id;
let central_region = sources[0].region_id();
let central_region_datanode = source_region_routes[0]
.leader_peer
.as_ref()
@@ -216,16 +213,14 @@ mod tests {
use crate::error::Error;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_ensure_route_present_missing_source_region() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region =
SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
@@ -249,11 +244,9 @@ mod tests {
#[test]
fn test_ensure_route_present_partition_expr_mismatch() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region =
SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
@@ -277,12 +270,69 @@ mod tests {
}
#[test]
fn test_ensure_route_present_missing_target_region() {
let source_region = RegionDescriptor {
fn test_ensure_route_present_default_source_matches_empty_partition_expr() {
let source_region = SourceRegionDescriptor::Default {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let result = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
);
assert!(result.is_ok());
}
#[test]
fn test_ensure_route_present_default_source_rejects_non_empty_partition_expr() {
let source_region = SourceRegionDescriptor::Default {
region_id: RegionId::new(1024, 1),
};
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
}
#[test]
fn test_ensure_route_present_missing_target_region() {
let source_region =
SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
@@ -307,11 +357,9 @@ mod tests {
#[test]
fn test_ensure_route_present_legacy_partition_expr_source() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region =
SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};

View File

@@ -22,7 +22,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
impl UpdateMetadata {
/// Applies the new partition expressions for staging regions.
@@ -32,8 +32,8 @@ impl UpdateMetadata {
/// - Source region not found.
pub(crate) fn apply_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
sources: &[SourceRegionDescriptor],
targets: &[TargetRegionDescriptor],
pending_deallocate_region_ids: &[store_api::storage::RegionId],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
@@ -61,15 +61,16 @@ impl UpdateMetadata {
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
let region_id = source.region_id();
let region_route = region_routes_map.get_mut(&region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
region_id,
},
)?;
// Set leader staging state for the source region route.
region_route.set_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
if pending_deallocate_region_ids.contains(&region_id) {
// When a region is pending deallocation, it should ignore all writes.
region_route.set_ignore_all_writes();
}
@@ -130,7 +131,7 @@ mod tests {
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::test_util::range_expr;
#[test]
@@ -166,11 +167,11 @@ mod tests {
..Default::default()
},
];
let source_region = RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region = SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
);
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 10),
};
@@ -196,6 +197,68 @@ mod tests {
assert!(!new_region_routes[2].is_leader_staging());
}
#[test]
fn test_generate_region_routes_with_reused_default_source_region() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let default_region_id = RegionId::new(table_id, 1);
let region_routes = vec![
RegionRoute {
region: Region {
id: default_region_id,
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let source_region = SourceRegionDescriptor::Default {
region_id: default_region_id,
};
let reused_target_expr = range_expr("x", 0, 10);
let target_regions = vec![
TargetRegionDescriptor {
region_id: default_region_id,
partition_expr: reused_target_expr.clone(),
},
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 10, 20),
},
];
let new_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&[source_region],
&target_regions,
&[],
&region_routes,
)
.unwrap();
assert_eq!(
new_region_routes[0].region.partition_expr,
reused_target_expr.as_json_str().unwrap()
);
assert!(new_region_routes[0].is_leader_staging());
assert!(!new_region_routes[0].is_ignore_all_writes());
assert_eq!(
new_region_routes[1].region.partition_expr,
range_expr("x", 10, 20).as_json_str().unwrap()
);
assert!(new_region_routes[1].is_leader_staging());
}
#[test]
fn test_generate_region_routes_mark_pending_deallocate_reject_all_writes() {
let group_id = Uuid::new_v4();
@@ -221,11 +284,11 @@ mod tests {
..Default::default()
},
];
let source_region = RegionDescriptor {
region_id: pending_deallocate_region_id,
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region = SourceRegionDescriptor::partitioned(
pending_deallocate_region_id,
range_expr("x", 0, 100),
);
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 10),
};

View File

@@ -22,13 +22,13 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
impl UpdateMetadata {
pub(crate) fn exit_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
sources: &[SourceRegionDescriptor],
targets: &[TargetRegionDescriptor],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
@@ -48,10 +48,11 @@ impl UpdateMetadata {
}
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
let region_id = source.region_id();
let region_route = region_routes_map.get_mut(&region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
region_id,
},
)?;
region_route.clear_leader_staging();
@@ -113,24 +114,25 @@ mod tests {
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_exit_staging_region_routes_keep_reject_all_writes() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let source_region = RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
let source_region = SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
);
let source_region_id = source_region.region_id();
let target_region = TargetRegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 50),
};
let mut source_route = RegionRoute {
region: Region {
id: source_region.region_id,
id: source_region_id,
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
@@ -165,4 +167,40 @@ mod tests {
assert!(!new_region_routes[1].is_leader_staging());
assert!(new_region_routes[1].is_ignore_all_writes());
}
#[test]
fn test_exit_staging_region_routes_with_reused_default_source_region() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let default_region_id = RegionId::new(table_id, 1);
let source_region = SourceRegionDescriptor::Default {
region_id: default_region_id,
};
let target_region = TargetRegionDescriptor {
region_id: default_region_id,
partition_expr: range_expr("x", 0, 50),
};
let target_expr = target_region.partition_expr.as_json_str().unwrap();
let region_route = RegionRoute {
region: Region {
id: default_region_id,
partition_expr: target_expr.clone(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
};
let new_region_routes = UpdateMetadata::exit_staging_region_routes(
group_id,
&[source_region],
&[target_region],
&[region_route],
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert_eq!(new_region_routes[0].region.partition_expr, target_expr);
}
}

View File

@@ -16,17 +16,137 @@ use std::cmp::Ordering;
use common_meta::rpc::router::RegionRoute;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::{self, Result};
use crate::procedure::repartition::group::GroupId;
/// Metadata describing a region involved in the plan.
/// Metadata describing a source region involved in the plan.
///
/// Source regions may represent either an existing partitioned region or the
/// default region of an unpartitioned table.
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub enum SourceRegionDescriptor {
/// A regular partitioned source region.
Partitioned {
/// The region id of the source region.
region_id: RegionId,
/// The partition expression of the source region.
partition_expr: PartitionExpr,
},
/// The default source region of an unpartitioned table.
Default {
/// The region id of the default source region.
region_id: RegionId,
},
}
impl<'de> Deserialize<'de> for SourceRegionDescriptor {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct PartitionedSourceRegionDescriptor {
region_id: RegionId,
partition_expr: PartitionExpr,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum SourceRegionDescriptorRepr {
Tagged(SourceRegionDescriptorTaggedRepr),
Legacy(PartitionedSourceRegionDescriptor),
}
#[derive(Deserialize)]
enum SourceRegionDescriptorTaggedRepr {
Partitioned {
region_id: RegionId,
partition_expr: PartitionExpr,
},
Default {
region_id: RegionId,
},
}
match SourceRegionDescriptorRepr::deserialize(deserializer)? {
SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Partitioned {
region_id,
partition_expr,
}) => Ok(Self::Partitioned {
region_id,
partition_expr,
}),
SourceRegionDescriptorRepr::Tagged(SourceRegionDescriptorTaggedRepr::Default {
region_id,
}) => Ok(Self::Default { region_id }),
SourceRegionDescriptorRepr::Legacy(descriptor) => Ok(Self::Partitioned {
region_id: descriptor.region_id,
partition_expr: descriptor.partition_expr,
}),
}
}
}
impl SourceRegionDescriptor {
/// Creates a partitioned source region descriptor.
pub fn partitioned(region_id: RegionId, partition_expr: PartitionExpr) -> Self {
Self::Partitioned {
region_id,
partition_expr,
}
}
/// Returns the region id of this source descriptor.
pub fn region_id(&self) -> RegionId {
match self {
Self::Partitioned { region_id, .. } => *region_id,
Self::Default { region_id } => *region_id,
}
}
/// Returns the partition expression if this source is partitioned.
pub fn partition_expr(&self) -> Option<&PartitionExpr> {
match self {
Self::Partitioned { partition_expr, .. } => Some(partition_expr),
Self::Default { .. } => None,
}
}
/// Returns true if this source descriptor matches the route partition expression.
pub fn matches_route_expr(&self, route_expr: &str) -> Result<bool> {
match self {
Self::Partitioned { partition_expr, .. } => {
let expected = partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
Ok(route_expr == expected)
}
Self::Default { .. } => Ok(route_expr.is_empty()),
}
}
/// Returns the route partition expression to restore during rollback.
pub fn route_expr_for_rollback(&self) -> Result<String> {
match self {
Self::Partitioned { partition_expr, .. } => partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu),
Self::Default { .. } => Ok(String::new()),
}
}
}
/// Metadata describing a target region involved in the plan.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
/// The region id of the region involved in the plan.
pub struct TargetRegionDescriptor {
/// The region id of the target region.
pub region_id: RegionId,
/// The partition expression of the region.
/// The partition expression of the target region.
pub partition_expr: PartitionExpr,
}
@@ -37,7 +157,7 @@ pub struct AllocationPlanEntry {
/// The group id for this plan entry.
pub group_id: GroupId,
/// Source region descriptors involved in the plan.
pub source_regions: Vec<RegionDescriptor>,
pub source_regions: Vec<SourceRegionDescriptor>,
/// The target partition expressions for the new or changed regions.
pub target_partition_exprs: Vec<PartitionExpr>,
/// For each `source_regions[k]`, the corresponding vector contains global
@@ -52,9 +172,9 @@ pub struct RepartitionPlanEntry {
/// The group id for this plan entry.
pub group_id: GroupId,
/// The source region descriptors involved in the plan.
pub source_regions: Vec<RegionDescriptor>,
pub source_regions: Vec<SourceRegionDescriptor>,
/// The target region descriptors involved in the plan.
pub target_regions: Vec<RegionDescriptor>,
pub target_regions: Vec<TargetRegionDescriptor>,
/// The region ids of the allocated regions.
pub allocated_region_ids: Vec<RegionId>,
/// The region ids of the regions that are pending deallocation.
@@ -69,7 +189,7 @@ pub struct RepartitionPlanEntry {
impl RepartitionPlanEntry {
/// Returns the target regions that are newly allocated.
pub(crate) fn allocate_regions(&self) -> Vec<&RegionDescriptor> {
pub(crate) fn allocate_regions(&self) -> Vec<&TargetRegionDescriptor> {
self.target_regions
.iter()
.filter(|r| self.allocated_region_ids.contains(&r.region_id))
@@ -111,7 +231,7 @@ pub fn convert_allocation_plan_to_repartition_plan(
.iter()
.skip(source_regions.len())
.map(|target_partition_expr| {
let desc = RegionDescriptor {
let desc = TargetRegionDescriptor {
region_id: RegionId::new(table_id, *next_region_number),
partition_expr: target_partition_expr.clone(),
};
@@ -128,10 +248,12 @@ pub fn convert_allocation_plan_to_repartition_plan(
let target_regions = source_regions
.iter()
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.map(
|(source_region, target_partition_expr)| TargetRegionDescriptor {
region_id: source_region.region_id(),
partition_expr: target_partition_expr.clone(),
},
)
.chain(pending_allocate_target_partition_exprs)
.collect::<Vec<_>>();
@@ -149,10 +271,12 @@ pub fn convert_allocation_plan_to_repartition_plan(
let target_regions = source_regions
.iter()
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.map(
|(source_region, target_partition_expr)| TargetRegionDescriptor {
region_id: source_region.region_id(),
partition_expr: target_partition_expr.clone(),
},
)
.collect::<Vec<_>>();
RepartitionPlanEntry {
@@ -171,16 +295,18 @@ pub fn convert_allocation_plan_to_repartition_plan(
.iter()
.take(target_partition_exprs.len())
.zip(target_partition_exprs.iter())
.map(|(source_region, target_partition_expr)| RegionDescriptor {
region_id: source_region.region_id,
partition_expr: target_partition_expr.clone(),
})
.map(
|(source_region, target_partition_expr)| TargetRegionDescriptor {
region_id: source_region.region_id(),
partition_expr: target_partition_expr.clone(),
},
)
.collect::<Vec<_>>();
let pending_deallocate_region_ids = source_regions
.iter()
.skip(target_partition_exprs.len())
.map(|source_region| source_region.region_id)
.map(|source_region| source_region.region_id())
.collect::<Vec<_>>();
RepartitionPlanEntry {
@@ -210,11 +336,140 @@ mod tests {
col: &str,
start: i64,
end: i64,
) -> RegionDescriptor {
RegionDescriptor {
region_id: RegionId::new(table_id, region_number),
partition_expr: range_expr(col, start, end),
}
) -> SourceRegionDescriptor {
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, region_number),
range_expr(col, start, end),
)
}
#[test]
fn test_source_region_descriptor_deserializes_legacy_partitioned_shape() {
let table_id = 1024;
let region_id = RegionId::new(table_id, 1);
let partition_expr = range_expr("x", 0, 100);
let legacy_json = serde_json::json!({
"region_id": region_id,
"partition_expr": partition_expr,
});
let descriptor: SourceRegionDescriptor = serde_json::from_value(legacy_json).unwrap();
assert_eq!(
descriptor,
SourceRegionDescriptor::partitioned(region_id, partition_expr)
);
}
#[test]
fn test_source_region_descriptor_rejects_legacy_default_shape() {
let region_id = RegionId::new(1024, 1);
let default_json = serde_json::json!({
"region_id": region_id,
});
let err = serde_json::from_value::<SourceRegionDescriptor>(default_json).unwrap_err();
assert!(err.to_string().contains("data did not match any variant"));
}
#[test]
fn test_source_region_descriptor_roundtrip_tagged_partitioned_shape() {
let region_id = RegionId::new(1024, 1);
let partition_expr = range_expr("x", 0, 100);
let descriptor = SourceRegionDescriptor::partitioned(region_id, partition_expr.clone());
let value = serde_json::to_value(&descriptor).unwrap();
let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
assert_eq!(
value,
serde_json::json!({
"Partitioned": {
"region_id": region_id,
"partition_expr": partition_expr,
}
})
);
assert_eq!(decoded, descriptor);
}
#[test]
fn test_source_region_descriptor_roundtrip_tagged_default_shape() {
let region_id = RegionId::new(1024, 1);
let descriptor = SourceRegionDescriptor::Default { region_id };
let value = serde_json::to_value(&descriptor).unwrap();
let decoded = serde_json::from_value::<SourceRegionDescriptor>(value.clone()).unwrap();
assert_eq!(
value,
serde_json::json!({
"Default": {
"region_id": region_id,
}
})
);
assert_eq!(decoded, descriptor);
}
#[test]
fn test_source_region_descriptor_rejects_invalid_partition_expr_shape() {
let region_id = RegionId::new(1024, 1);
let invalid_json = serde_json::json!({
"region_id": region_id,
"partition_expr": 42,
});
let err = serde_json::from_value::<SourceRegionDescriptor>(invalid_json).unwrap_err();
assert!(err.to_string().contains("data did not match any variant"));
}
#[test]
fn test_repartition_plan_entry_deserializes_legacy_source_regions() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let source_region_id = RegionId::new(table_id, 1);
let target_region_id = RegionId::new(table_id, 2);
let source_partition_expr = range_expr("x", 0, 100);
let target_partition_expr = range_expr("x", 0, 50);
let legacy_json = serde_json::json!({
"group_id": group_id,
"source_regions": [{
"region_id": source_region_id,
"partition_expr": source_partition_expr,
}],
"target_regions": [{
"region_id": target_region_id,
"partition_expr": target_partition_expr,
}],
"allocated_region_ids": [target_region_id],
"pending_deallocate_region_ids": [],
"transition_map": [[0]],
});
let plan: RepartitionPlanEntry = serde_json::from_value(legacy_json).unwrap();
assert_eq!(plan.group_id, group_id);
assert_eq!(
plan.source_regions,
vec![SourceRegionDescriptor::partitioned(
source_region_id,
source_partition_expr
)]
);
assert_eq!(
plan.target_regions,
vec![TargetRegionDescriptor {
region_id: target_region_id,
partition_expr: target_partition_expr,
}]
);
assert_eq!(plan.allocated_region_ids, vec![target_region_id]);
assert!(plan.pending_deallocate_region_ids.is_empty());
assert_eq!(plan.transition_map, vec![vec![0]]);
assert!(plan.original_target_routes.is_empty());
}
#[test]
@@ -468,6 +723,55 @@ mod tests {
assert_eq!(next_region_number, 6);
}
#[test]
fn test_convert_plan_allocate_default_source_region() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let mut next_region_number = 5;
let source_regions = vec![SourceRegionDescriptor::Default {
region_id: RegionId::new(table_id, 1),
}];
let target_partition_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let allocation_plan = AllocationPlanEntry {
group_id,
source_regions: source_regions.clone(),
target_partition_exprs: target_partition_exprs.clone(),
transition_map: vec![vec![0, 1]],
};
let result = convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plan,
);
assert_eq!(result.source_regions, source_regions);
assert_eq!(result.target_regions.len(), 2);
assert_eq!(
result.target_regions[0].region_id,
RegionId::new(table_id, 1)
);
assert_eq!(
result.target_regions[0].partition_expr,
target_partition_exprs[0]
);
assert_eq!(
result.target_regions[1].region_id,
RegionId::new(table_id, 5)
);
assert_eq!(
result.target_regions[1].partition_expr,
target_partition_exprs[1]
);
assert_eq!(
result.allocated_region_ids,
vec![RegionId::new(table_id, 5)]
);
assert!(result.pending_deallocate_region_ids.is_empty());
assert_eq!(result.transition_map, vec![vec![0, 1]]);
assert_eq!(next_region_number, 6);
}
#[test]
fn test_convert_plan_deallocate_to_single_region() {
let group_id = Uuid::new_v4();

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use partition::collider::Collider;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
@@ -26,7 +27,7 @@ use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::allocate_region::AllocateRegion;
use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor};
use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor};
use crate::procedure::repartition::repartition_end::RepartitionEnd;
use crate::procedure::repartition::{Context, State};
@@ -107,8 +108,12 @@ impl RepartitionStart {
from_exprs: &[PartitionExpr],
to_exprs: &[PartitionExpr],
) -> Result<Vec<AllocationPlanEntry>> {
let subtasks = subtask::create_subtasks(from_exprs, to_exprs)
.context(error::RepartitionCreateSubtasksSnafu)?;
let subtasks = if from_exprs.is_empty() {
Self::default_source_subtasks(to_exprs)?
} else {
subtask::create_subtasks(from_exprs, to_exprs)
.context(error::RepartitionCreateSubtasksSnafu)?
};
if subtasks.is_empty() {
return Ok(vec![]);
}
@@ -123,7 +128,7 @@ impl RepartitionStart {
fn build_plan_entries(
subtasks: Vec<RepartitionSubtask>,
source_index: &[RegionDescriptor],
source_index: &[SourceRegionDescriptor],
target_exprs: &[PartitionExpr],
) -> Vec<AllocationPlanEntry> {
subtasks
@@ -151,10 +156,32 @@ impl RepartitionStart {
.collect::<Vec<_>>()
}
fn default_source_subtasks(to_exprs: &[PartitionExpr]) -> Result<Vec<RepartitionSubtask>> {
ensure!(
!to_exprs.is_empty(),
error::UnexpectedSnafu {
violated: "Default source repartition expects non-empty target partition exprs",
}
);
Collider::new(to_exprs).context(error::RepartitionCreateSubtasksSnafu)?;
let to_expr_indices = (0..to_exprs.len()).collect::<Vec<_>>();
Ok(vec![RepartitionSubtask {
from_expr_indices: vec![0],
to_expr_indices: to_expr_indices.clone(),
transition_map: vec![to_expr_indices],
}])
}
fn source_region_descriptors(
from_exprs: &[PartitionExpr],
physical_route: &PhysicalTableRouteValue,
) -> Result<Vec<RegionDescriptor>> {
) -> Result<Vec<SourceRegionDescriptor>> {
if from_exprs.is_empty() {
return Self::default_source_region_descriptors(physical_route);
}
let existing_regions = physical_route
.region_routes
.iter()
@@ -178,13 +205,166 @@ impl RepartitionStart {
debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
})?;
Ok(RegionDescriptor {
region_id: matched_region_id,
partition_expr: expr.clone(),
})
Ok(SourceRegionDescriptor::partitioned(
matched_region_id,
expr.clone(),
))
})
.collect::<Result<Vec<_>>>()?;
Ok(descriptors)
}
fn default_source_region_descriptors(
physical_route: &PhysicalTableRouteValue,
) -> Result<Vec<SourceRegionDescriptor>> {
ensure!(
physical_route.region_routes.len() == 1,
error::UnexpectedSnafu {
violated: format!(
"Default source repartition expects exactly one source region, but got {}",
physical_route.region_routes.len()
),
}
);
let source_region = &physical_route.region_routes[0].region;
ensure!(
source_region.partition_expr().is_empty(),
error::UnexpectedSnafu {
violated: format!(
"Default source repartition expects an empty partition expr, but got {}",
source_region.partition_expr()
),
}
);
Ok(vec![SourceRegionDescriptor::Default {
region_id: source_region.id,
}])
}
}
#[cfg(test)]
mod tests {
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use datatypes::prelude::Value;
use partition::expr::{Operand, RestrictedOp};
use store_api::storage::RegionId;
use super::*;
use crate::procedure::repartition::test_util::{range_expr, test_region_route};
fn physical_route(region_routes: Vec<RegionRoute>) -> PhysicalTableRouteValue {
PhysicalTableRouteValue::new(region_routes)
}
#[test]
fn test_build_plan_with_default_source_region() {
let table_id = 1024;
let physical_route =
physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let plans = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap();
assert_eq!(plans.len(), 1);
let plan = &plans[0];
assert_eq!(
plan.source_regions,
vec![SourceRegionDescriptor::Default {
region_id: RegionId::new(table_id, 1)
}]
);
assert_eq!(plan.target_partition_exprs, to_exprs);
assert_eq!(plan.transition_map, vec![vec![0, 1]]);
}
#[test]
fn test_build_plan_with_default_source_rejects_non_empty_partition_expr() {
let table_id = 1024;
let physical_route = physical_route(vec![test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
)]);
let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
assert!(err.to_string().contains("empty partition expr"));
}
#[test]
fn test_build_plan_with_default_source_rejects_multiple_regions() {
let table_id = 1024;
let physical_route = physical_route(vec![
test_region_route(RegionId::new(table_id, 1), ""),
test_region_route(RegionId::new(table_id, 2), ""),
]);
let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
assert!(err.to_string().contains("exactly one source region"));
}
#[test]
fn test_build_plan_with_default_source_rejects_empty_targets() {
let table_id = 1024;
let physical_route =
physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
let err = RepartitionStart::build_plan(&physical_route, &[], &[]).unwrap_err();
assert!(err.to_string().contains("non-empty target partition exprs"));
}
#[test]
fn test_build_plan_with_default_source_rejects_invalid_targets() {
let table_id = 1024;
let physical_route =
physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
let invalid_to_expr = PartitionExpr::new(
Operand::Value(Value::Int64(1)),
RestrictedOp::Eq,
Operand::Value(Value::Int64(2)),
);
let err =
RepartitionStart::build_plan(&physical_route, &[], &[invalid_to_expr]).unwrap_err();
assert!(
err.to_string()
.contains("Failed to create repartition subtasks")
);
}
#[test]
fn test_build_plan_keeps_partitioned_source_matching() {
let table_id = 1024;
let from_exprs = vec![range_expr("x", 0, 100)];
let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let physical_route = physical_route(vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: from_exprs[0].as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}]);
let plans = RepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap();
assert_eq!(plans.len(), 1);
assert_eq!(
plans[0].source_regions,
vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
from_exprs[0].clone()
)]
);
}
}

View File

@@ -42,7 +42,7 @@ use uuid::Uuid;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::metasrv::MetasrvInfo;
use crate::procedure::repartition::group::{Context, PersistentContext, VolatileContext};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::{
Context as ParentContext, PersistentContext as ParentPersistentContext, RepartitionProcedure,
};
@@ -177,8 +177,8 @@ pub fn test_region_wal_options(region_numbers: &[RegionNumber]) -> HashMap<Regio
pub fn new_persistent_context(
table_id: TableId,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
sources: Vec<SourceRegionDescriptor>,
targets: Vec<TargetRegionDescriptor>,
) -> PersistentContext {
PersistentContext {
group_id: Uuid::new_v4(),

View File

@@ -23,7 +23,7 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::{self, Result};
use crate::procedure::repartition::group::GroupId;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::SourceRegionDescriptor;
/// Returns the `datanode_table_value`
///
@@ -138,21 +138,23 @@ pub fn merge_and_validate_region_wal_options(
/// restored here.
pub fn rollback_group_metadata_routes(
group_id: GroupId,
source_regions: &[RegionDescriptor],
source_regions: &[SourceRegionDescriptor],
original_target_routes: &[RegionRoute],
allocated_region_ids: &[RegionId],
pending_deallocate_region_ids: &[RegionId],
region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
) -> Result<()> {
for source in source_regions {
let region_route = region_routes_map.get_mut(&source.region_id).context(
let region_id = source.region_id();
let region_route = region_routes_map.get_mut(&region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region_id,
region_id,
},
)?;
region_route.clear_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
region_route.region.partition_expr = source.route_expr_for_rollback()?;
if pending_deallocate_region_ids.contains(&region_id) {
region_route.clear_ignore_all_writes();
}
}
@@ -191,7 +193,7 @@ mod tests {
use super::*;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
use crate::procedure::repartition::test_util::range_expr;
/// Helper function to create a Kafka WAL option string from a topic name.
@@ -242,7 +244,7 @@ mod tests {
fn original_target_routes(
region_routes: &[RegionRoute],
targets: &[RegionDescriptor],
targets: &[TargetRegionDescriptor],
) -> Vec<RegionRoute> {
let target_ids = targets
.iter()
@@ -380,16 +382,16 @@ mod tests {
),
new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
];
let sources = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}];
let sources = vec![SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
)];
let targets = vec![
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionDescriptor {
TargetRegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
@@ -420,6 +422,60 @@ mod tests {
assert_eq!(applied_region_routes, original_region_routes);
}
#[test]
fn test_rollback_group_metadata_routes_default_source_restores_empty_expr() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let default_region_id = RegionId::new(table_id, 1);
let allocated_region_id = RegionId::new(table_id, 2);
let source_regions = vec![SourceRegionDescriptor::Default {
region_id: default_region_id,
}];
let target_regions = vec![
TargetRegionDescriptor {
region_id: default_region_id,
partition_expr: range_expr("x", 0, 50),
},
TargetRegionDescriptor {
region_id: allocated_region_id,
partition_expr: range_expr("x", 50, 100),
},
];
let current_region_routes = vec![
new_staged_region_route(default_region_id, "", None, false),
new_staged_region_route(allocated_region_id, "", None, false),
];
let original_target_routes = vec![current_region_routes[0].clone()];
let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&source_regions,
&target_regions,
&[],
&current_region_routes,
)
.unwrap();
assert_eq!(
applied_region_routes[0].region.partition_expr,
range_expr("x", 0, 50).as_json_str().unwrap()
);
rollback_group_metadata_routes(
group_id,
&source_regions,
&original_target_routes,
&[allocated_region_id],
&[],
&mut applied_region_routes
.iter_mut()
.map(|route| (route.region.id, route))
.collect(),
)
.unwrap();
assert_eq!(applied_region_routes[0].region.partition_expr, "");
assert!(!applied_region_routes[0].is_leader_staging());
}
#[test]
fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
let group_id = Uuid::new_v4();
@@ -445,16 +501,16 @@ mod tests {
),
];
let sources = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 1),
range_expr("x", 0, 100),
),
SourceRegionDescriptor::partitioned(
RegionId::new(table_id, 2),
range_expr("x", 100, 200),
),
];
let targets = vec![RegionDescriptor {
let targets = vec![TargetRegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}];