fix(repartition): harden repartition rollback paths (#7918)

* fix(meta-srv): restore repartition group metadata on rollback

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(meta-srv): add repartition group rollback coverage

* fix(meta-srv): rollback allocated regions on repartition failure

* test(meta-srv): cover repartition parent rollback flow

* test(meta-srv): cover repartition retry paths

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: persist repartition allocate state for retry and rollback

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: retry repartition mailbox channel close

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine logs

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-08 20:06:11 +08:00
committed by GitHub
parent 70ad412092
commit 6cc68ee8e1
17 changed files with 2066 additions and 162 deletions

View File

@@ -24,7 +24,6 @@ use common_base::Plugins;
use common_config::Configurable;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use common_meta::distributed_time_constants::META_LEASE_SECS;
use common_meta::election::CANDIDATE_LEASE_SECS;
use common_meta::election::etcd::EtcdElection;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
@@ -290,6 +289,7 @@ pub async fn metasrv_builder(
use std::time::Duration;
use common_meta::distributed_time_constants::POSTGRES_KEEP_ALIVE_SECS;
use common_meta::election::CANDIDATE_LEASE_SECS;
use common_meta::election::rds::postgres::{ElectionPgClient, PgElection};
use common_meta::kv_backend::rds::PgStore;
use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod};
@@ -354,6 +354,7 @@ pub async fn metasrv_builder(
(None, BackendImpl::MysqlStore) => {
use std::time::Duration;
use common_meta::election::CANDIDATE_LEASE_SECS;
use common_meta::election::rds::mysql::{ElectionMysqlClient, MySqlElection};
use common_meta::kv_backend::rds::MySqlStore;

View File

@@ -1136,6 +1136,12 @@ impl Error {
Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::MailboxTimeout { .. }
) || matches!(
self,
Error::AllocateRegions { source, .. } if source.is_retry_later()
) || matches!(
self,
Error::DeallocateRegions { source, .. } if source.is_retry_later()
)
}
}
@@ -1324,3 +1330,35 @@ pub(crate) fn match_for_io_error(err_status: &tonic::Status) -> Option<&std::io:
err = err.source()?;
}
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use snafu::ResultExt;
use super::DeallocateRegionsSnafu;
#[test]
fn test_deallocate_regions_is_retryable_when_source_is_retry_later() {
let source = common_meta::error::Error::retry_later(MockError::new(StatusCode::Internal));
let err = Err::<(), _>(source)
.context(DeallocateRegionsSnafu { table_id: 1024_u32 })
.unwrap_err();
assert!(err.is_retryable());
}
#[test]
fn test_deallocate_regions_is_not_retryable_when_source_is_not_retry_later() {
let source = common_meta::error::UnexpectedSnafu {
err_msg: "mock error",
}
.build();
let err = Err::<(), _>(source)
.context(DeallocateRegionsSnafu { table_id: 1024_u32 })
.unwrap_err();
assert!(!err.is_retryable());
}
}

View File

@@ -23,7 +23,7 @@ pub mod repartition_start;
pub mod utils;
use std::any::Any;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::time::{Duration, Instant};
@@ -40,15 +40,15 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::rpc::router::RegionRoute;
use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -56,6 +56,8 @@ use store_api::storage::{RegionNumber, TableId};
use table::table_name::TableName;
use crate::error::{self, Result};
use crate::procedure::repartition::collect::ProcedureMeta;
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::group::{
Context as RepartitionGroupContext, RepartitionGroupProcedure,
};
@@ -74,6 +76,12 @@ pub struct PersistentContext {
pub table_name: String,
pub table_id: TableId,
pub plans: Vec<RepartitionPlanEntry>,
/// Records failed sub-procedures for metadata rollback.
#[serde(default)]
pub failed_procedures: Vec<ProcedureMeta>,
#[serde(default)]
/// Records unknown sub-procedures for metadata rollback.
pub unknown_procedures: Vec<ProcedureMeta>,
/// The timeout for repartition operations.
#[serde(with = "humantime_serde", default = "default_timeout")]
pub timeout: Duration,
@@ -102,6 +110,8 @@ impl PersistentContext {
table_name,
table_id,
plans: vec![],
failed_procedures: vec![],
unknown_procedures: vec![],
timeout: timeout.unwrap_or_else(default_timeout),
}
}
@@ -393,6 +403,23 @@ impl Context {
.await;
Ok(())
}
pub fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let mut operating_guards = Vec::with_capacity(region_routes.len());
for (region_id, datanode_id) in operating_leader_regions(region_routes) {
let guard = memory_region_keeper
.register(datanode_id, region_id)
.context(error::RegionOperatingRaceSnafu {
peer_id: datanode_id,
region_id,
})?;
operating_guards.push(guard);
}
Ok(operating_guards)
}
}
#[async_trait::async_trait]
@@ -456,6 +483,131 @@ impl RepartitionProcedure {
Ok(Self { state, context })
}
/// Returns whether parent rollback should remove this repartition's allocated regions.
///
/// This uses an "after AllocateRegion" semantic: once execution reaches
/// `AllocateRegion` or any later state, rollback must try to remove this round's
/// `allocated_region_ids` from table-route metadata when they exist.
///
/// State flow:
/// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd`
/// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/// rollback allocated regions in metadata
///
/// Notes:
/// - `RepartitionStart`: no-op, because allocation has not happened yet.
/// - `AllocateRegion` / `Dispatch` / `Collect` rollback-active.
/// - `DeallocateRegion`: is not rollback-active.
/// - `RepartitionEnd`: no-op.
fn should_rollback_allocated_regions(&self) -> bool {
self.state.as_any().is::<allocate_region::AllocateRegion>()
|| self.state.as_any().is::<dispatch::Dispatch>()
|| self.state.as_any().is::<collect::Collect>()
}
fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
if self.state.as_any().is::<allocate_region::AllocateRegion>()
|| self.state.as_any().is::<dispatch::Dispatch>()
{
return self
.context
.persistent_ctx
.plans
.iter()
.flat_map(|plan| plan.allocated_region_ids.iter().copied())
.collect();
}
self.context
.persistent_ctx
.failed_procedures
.iter()
.chain(self.context.persistent_ctx.unknown_procedures.iter())
.flat_map(|procedure_meta| {
let plan_index = procedure_meta.plan_index;
self.context.persistent_ctx.plans[plan_index]
.allocated_region_ids
.iter()
.copied()
})
.collect()
}
fn filter_allocated_region_routes(
region_routes: &[RegionRoute],
allocated_region_ids: &HashSet<store_api::storage::RegionId>,
) -> Vec<RegionRoute> {
region_routes
.iter()
.filter(|route| !allocated_region_ids.contains(&route.region.id))
.cloned()
.collect()
}
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
if !self.should_rollback_allocated_regions() {
return Ok(());
}
let table_id = self.context.persistent_ctx.table_id;
let allocated_region_ids = self.rollback_allocated_region_ids();
if allocated_region_ids.is_empty() {
return Ok(());
}
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let table_route_value = self.context.get_table_route_value().await?;
let current_region_routes = table_route_value.region_routes().unwrap();
let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
table_id,
current_region_routes,
&allocated_region_ids,
);
if !allocated_region_routes.is_empty() {
let table = TableName {
catalog_name: self.context.persistent_ctx.catalog_name.clone(),
schema_name: self.context.persistent_ctx.schema_name.clone(),
table_name: self.context.persistent_ctx.table_name.clone(),
};
// Memory guards are not required here,
// because the table metadata still contains routes for the deallocating regions.
if let Err(err) = DeallocateRegion::deallocate_regions(
&self.context.node_manager,
&self.context.leader_region_registry,
table,
table_id,
&allocated_region_routes,
)
.await
{
warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
}
}
let new_region_routes =
Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids);
if new_region_routes.len() != current_region_routes.len() {
self.context
.update_table_route(&table_route_value, new_region_routes, HashMap::new())
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to rollback allocated region routes for repartition table: {}",
table_id
),
})?;
}
if let Err(err) = self.context.invalidate_table_cache().await {
warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
}
Ok(())
}
}
#[async_trait::async_trait]
@@ -497,9 +649,14 @@ impl Procedure for RepartitionProcedure {
}
}
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner(ctx)
.await
.map_err(ProcedureError::external)
}
fn rollback_supported(&self) -> bool {
// TODO(weny): support rollback.
false
true
}
fn dump(&self) -> ProcedureResult<String> {
@@ -624,3 +781,642 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use common_error::ext::BoxedError;
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_meta::ddl::test_util::datanode_handler::{
DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
};
use common_meta::error;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::MockDatanodeManager;
use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::sync::mpsc;
use uuid::Uuid;
use super::*;
use crate::procedure::repartition::allocate_region::AllocateRegion;
use crate::procedure::repartition::collect::Collect;
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::dispatch::Dispatch;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::repartition_end::RepartitionEnd;
use crate::procedure::repartition::test_util::{
TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
test_region_route, test_region_wal_options,
};
fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
RepartitionPlanEntry {
group_id: uuid::Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}],
target_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
],
allocated_region_ids: vec![RegionId::new(table_id, 3)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0, 1]],
}
}
fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
RepartitionProcedure { state, context }
}
fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
)
}
#[test]
fn test_filter_allocated_region_routes() {
let table_id = 1024;
let region_routes = vec![
test_region_route(RegionId::new(table_id, 1), "a"),
test_region_route(RegionId::new(table_id, 2), "b"),
];
let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
let new_region_routes = RepartitionProcedure::filter_allocated_region_routes(
&region_routes,
&allocated_region_ids,
);
assert_eq!(new_region_routes.len(), 1);
assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
}
#[test]
fn test_should_rollback_allocated_regions() {
let env = TestingEnv::new();
let table_id = 1024;
let procedure = test_procedure(
Box::new(RepartitionStart::new(vec![], vec![])),
test_context(&env, table_id),
);
assert!(!procedure.should_rollback_allocated_regions());
let procedure = test_procedure(
Box::new(AllocateRegion::new(vec![])),
test_context(&env, table_id),
);
assert!(procedure.should_rollback_allocated_regions());
let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
assert!(procedure.should_rollback_allocated_regions());
let procedure =
test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
assert!(procedure.should_rollback_allocated_regions());
let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
assert!(!procedure.should_rollback_allocated_regions());
let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
assert!(!procedure.should_rollback_allocated_regions());
}
#[tokio::test]
async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 50, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
];
env.create_physical_table_metadata_with_wal_options(
table_id,
original_region_routes,
test_region_wal_options(&[1, 2]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![test_plan(table_id)];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: Uuid::new_v4(),
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Dispatch),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(region_routes.len(), 2);
assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
}
#[tokio::test]
async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 50, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
];
env.create_physical_table_metadata_with_wal_options(
table_id,
original_region_routes,
test_region_wal_options(&[1, 2]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![test_plan(table_id)];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(AllocateRegion::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(region_routes.len(), 2);
assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
}
#[tokio::test]
async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
test_region_route(RegionId::new(table_id, 4), ""),
];
env.create_physical_table_metadata_with_wal_options(
table_id,
original_region_routes,
test_region_wal_options(&[1, 2, 3, 4]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
let failed_plan = test_plan(table_id);
let succeeded_plan = RepartitionPlanEntry {
group_id: Uuid::new_v4(),
source_regions: vec![RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
}],
target_regions: vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 150),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 4),
partition_expr: range_expr("x", 150, 200),
},
],
allocated_region_ids: vec![RegionId::new(table_id, 4)],
pending_deallocate_region_ids: vec![],
transition_map: vec![vec![0]],
};
persistent_ctx.plans = vec![failed_plan, succeeded_plan];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: persistent_ctx.plans[0].group_id,
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Collect::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(region_routes.len(), 3);
assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
}
#[tokio::test]
async fn test_repartition_rollback_is_idempotent() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
let ddl_ctx = env.ddl_context(node_manager);
let original_region_routes = vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 50, 100).as_json_str().unwrap(),
),
test_region_route(RegionId::new(table_id, 3), ""),
];
env.create_physical_table_metadata_with_wal_options(
table_id,
original_region_routes,
test_region_wal_options(&[1, 2]),
)
.await;
let mut persistent_ctx = PersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
persistent_ctx.plans = vec![test_plan(table_id)];
persistent_ctx.failed_procedures = vec![ProcedureMeta {
plan_index: 0,
group_id: Uuid::new_v4(),
procedure_id: ProcedureId::random(),
}];
let context = Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut procedure = RepartitionProcedure {
state: Box::new(Dispatch),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let once = current_parent_region_routes(&procedure.context).await;
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let twice = current_parent_region_routes(&procedure.context).await;
assert_eq!(once, twice);
assert_eq!(once.len(), 2);
assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
}
#[tokio::test]
async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
env.create_physical_table_metadata_for_repartition(
table_id,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
],
test_region_wal_options(&[1, 2]),
)
.await;
let context = new_parent_context(&env, node_manager, table_id);
let mut procedure = RepartitionProcedure::new(
vec![range_expr("x", 0, 100)],
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
context,
);
let start_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(!start_status.need_persist());
let start_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(start_status.need_persist());
assert_parent_state::<AllocateRegion>(&procedure);
let allocate_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(allocate_status.need_persist());
assert_parent_state::<Dispatch>(&procedure);
assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
let plan = &procedure.context.persistent_ctx.plans[0];
let expected_plan = test_plan(table_id);
assert_eq!(plan.source_regions, expected_plan.source_regions);
assert_eq!(plan.target_regions, expected_plan.target_regions);
assert_eq!(
plan.allocated_region_ids,
expected_plan.allocated_region_ids
);
assert_eq!(
plan.pending_deallocate_region_ids,
expected_plan.pending_deallocate_region_ids
);
assert_eq!(plan.transition_map, expected_plan.transition_map);
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(0)),
..Default::default()
},
]
);
let dispatch_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(!dispatch_status.need_persist());
let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
assert_eq!(subprocedure_ids.len(), 1);
assert_parent_state::<Collect>(&procedure);
let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
MockError::new(StatusCode::Internal),
)));
let collect_ctx = procedure_context_with_receivers(HashMap::from([(
subprocedure_ids[0],
procedure_state_receiver(failed_state),
)]));
let err = procedure.execute(&collect_ctx).await.unwrap_err();
assert!(!err.is_retry_later());
assert_parent_state::<Collect>(&procedure);
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(
region_routes,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
]
);
}
#[tokio::test]
async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
common_telemetry::init_default_ut_logging();
let env = TestingEnv::new();
let table_id = 1024;
let (tx, _rx) = mpsc::channel(8);
let should_retry = Arc::new(AtomicBool::new(true));
let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
if should_retry.swap(false, Ordering::SeqCst) {
return Err(error::Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
clean_poisons: false,
});
}
Ok(api::region::RegionResponse::new(0))
});
let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
env.create_physical_table_metadata_for_repartition(
table_id,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
],
test_region_wal_options(&[1, 2]),
)
.await;
let context = new_parent_context(&env, node_manager, table_id);
let mut procedure = RepartitionProcedure::new(
vec![range_expr("x", 0, 100)],
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
context,
);
let start_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(!start_status.need_persist());
let start_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(start_status.need_persist());
assert_parent_state::<AllocateRegion>(&procedure);
let err = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(err.is_retry_later());
assert_parent_state::<AllocateRegion>(&procedure);
assert!(!procedure.context.persistent_ctx.plans.is_empty());
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
]
);
let allocate_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(allocate_status.need_persist());
assert_parent_state::<Dispatch>(&procedure);
assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
let plan = &procedure.context.persistent_ctx.plans[0];
let expected_plan = test_plan(table_id);
assert_eq!(plan.source_regions, expected_plan.source_regions);
assert_eq!(plan.target_regions, expected_plan.target_regions);
assert_eq!(
plan.allocated_region_ids,
expected_plan.allocated_region_ids
);
assert_eq!(plan.transition_map, expected_plan.transition_map);
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![
test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
),
test_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
),
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(0)),
..Default::default()
},
]
);
let dispatch_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(!dispatch_status.need_persist());
let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
assert_eq!(subprocedure_ids.len(), 1);
assert_parent_state::<Collect>(&procedure);
}
}

View File

@@ -21,12 +21,11 @@ use common_meta::ddl::create_table::template::{
};
use common_meta::lock_key::TableLock;
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use common_telemetry::{debug, info};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::ResultExt;
use store_api::storage::{RegionNumber, TableId};
use table::metadata::TableInfo;
use table::table_reference::TableReference;
@@ -40,14 +39,103 @@ use crate::procedure::repartition::plan::{
};
use crate::procedure::repartition::{Context, State};
#[derive(Debug, Clone, Serialize)]
pub enum AllocateRegion {
Build(BuildPlan),
Execute(ExecutePlan),
}
impl<'de> Deserialize<'de> for AllocateRegion {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
enum CurrentAllocateRegion {
Build(BuildPlan),
Execute(ExecutePlan),
}
#[derive(Deserialize)]
struct LegacyAllocateRegion {
plan_entries: Vec<AllocationPlanEntry>,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum AllocateRegionRepr {
Current(CurrentAllocateRegion),
Legacy(LegacyAllocateRegion),
}
match AllocateRegionRepr::deserialize(deserializer)? {
AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
Ok(Self::Build(build_plan))
}
AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
Ok(Self::Execute(execute_plan))
}
AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
plan_entries: legacy.plan_entries,
})),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllocateRegion {
pub struct BuildPlan {
plan_entries: Vec<AllocationPlanEntry>,
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for AllocateRegion {
impl BuildPlan {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
let table_id = ctx.persistent_ctx.table_id;
let table_route_value = ctx.get_table_route_value().await?;
let mut next_region_number =
AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
// Converts allocation plan to repartition plan.
let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&self.plan_entries,
);
let plan_count = repartition_plan_entries.len();
let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
info!(
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
table_id, plan_count, to_allocate
);
// If no region to allocate, directly dispatch the plan.
if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
ctx.persistent_ctx.plans = repartition_plan_entries;
ctx.update_allocate_region_elapsed(timer.elapsed());
return Ok((Box::new(Dispatch), Status::executing(true)));
}
ctx.persistent_ctx.plans = repartition_plan_entries;
debug!(
"Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
table_id,
timer.elapsed()
);
Ok((
Box::new(AllocateRegion::Execute(ExecutePlan)),
Status::executing(true),
))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutePlan;
impl ExecutePlan {
async fn next(
&mut self,
ctx: &mut Context,
@@ -55,36 +143,13 @@ impl State for AllocateRegion {
) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
let table_id = ctx.persistent_ctx.table_id;
let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
let region_number_and_partition_exprs =
AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
let table_info_value = ctx.get_table_info_value().await?;
let table_route_value = ctx.get_table_route_value().await?;
// Safety: it is physical table route value.
let region_routes = table_route_value.region_routes().unwrap();
let mut next_region_number =
Self::get_next_region_number(table_route_value.max_region_number().unwrap());
// Converts allocation plan to repartition plan.
let repartition_plan_entries = Self::convert_to_repartition_plans(
table_id,
&mut next_region_number,
&self.plan_entries,
);
let plan_count = repartition_plan_entries.len();
let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
info!(
"Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
table_id, plan_count, to_allocate
);
// If no region to allocate, directly dispatch the plan.
if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
ctx.persistent_ctx.plans = repartition_plan_entries;
ctx.update_allocate_region_elapsed(timer.elapsed());
return Ok((Box::new(Dispatch), Status::executing(true)));
}
let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries);
let region_number_and_partition_exprs =
Self::prepare_region_allocation_data(&allocate_regions)?;
let table_info_value = ctx.get_table_info_value().await?;
let new_allocated_region_routes = ctx
.region_routes_allocator
.allocate(
@@ -122,12 +187,13 @@ impl State for AllocateRegion {
table_id, new_region_count, new_regions_brief
);
let _operating_guards = Self::register_operating_regions(
// The table route metadata is not updated yet; register it in memory for region lease renewal.
let _operating_guards = Context::register_operating_regions(
&ctx.memory_region_keeper,
&new_allocated_region_routes,
)?;
// Allocates the regions on datanodes.
Self::allocate_regions(
AllocateRegion::allocate_regions(
&ctx.node_manager,
&table_info_value.table_info,
&new_allocated_region_routes,
@@ -135,21 +201,33 @@ impl State for AllocateRegion {
)
.await?;
// TODO(weny): for metric engine, sync logical regions from the the central region.
// Updates the table routes.
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let new_region_routes =
Self::generate_region_routes(region_routes, &new_allocated_region_routes);
AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
.await?;
ctx.invalidate_table_cache().await?;
ctx.persistent_ctx.plans = repartition_plan_entries;
ctx.update_allocate_region_elapsed(timer.elapsed());
Ok((Box::new(Dispatch), Status::executing(true)))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for AllocateRegion {
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
match self {
AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
}
}
fn as_any(&self) -> &dyn Any {
self
@@ -158,24 +236,7 @@ impl State for AllocateRegion {
impl AllocateRegion {
pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
Self { plan_entries }
}
fn register_operating_regions(
memory_region_keeper: &MemoryRegionKeeperRef,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let mut operating_guards = Vec::with_capacity(region_routes.len());
for (region_id, datanode_id) in operating_leader_regions(region_routes) {
let guard = memory_region_keeper
.register(datanode_id, region_id)
.context(error::RegionOperatingRaceSnafu {
peer_id: datanode_id,
region_id,
})?;
operating_guards.push(guard);
}
Ok(operating_guards)
AllocateRegion::Build(BuildPlan { plan_entries })
}
fn generate_region_routes(
@@ -300,6 +361,7 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::procedure::repartition::State;
use crate::procedure::repartition::test_util::range_expr;
fn create_region_descriptor(
@@ -488,4 +550,71 @@ mod tests {
assert!(!result[0].1.is_empty());
assert!(!result[1].1.is_empty());
}
#[test]
fn test_allocate_region_state_backward_compatibility() {
// Arrange
let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
// Act
let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
// Assert
let allocate_region = state
.as_any()
.downcast_ref::<AllocateRegion>()
.expect("expected AllocateRegion state");
match allocate_region {
AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
AllocateRegion::Execute(_) => panic!("expected build plan"),
}
}
#[test]
fn test_allocate_region_state_round_trip() {
// Arrange
let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
// Act
let serialized = serde_json::to_string(&state).unwrap();
let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
// Assert
assert_eq!(
serialized,
r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
);
let allocate_region = deserialized
.as_any()
.downcast_ref::<AllocateRegion>()
.expect("expected AllocateRegion state");
match allocate_region {
AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
AllocateRegion::Execute(_) => panic!("expected build plan"),
}
}
#[test]
fn test_allocate_region_execute_state_round_trip() {
// Arrange
let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
// Act
let serialized = serde_json::to_string(&state).unwrap();
let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
// Assert
assert_eq!(
serialized,
r#"{"repartition_state":"AllocateRegion","Execute":null}"#
);
let allocate_region = deserialized
.as_any()
.downcast_ref::<AllocateRegion>()
.expect("expected AllocateRegion state");
match allocate_region {
AllocateRegion::Execute(_) => {}
AllocateRegion::Build(_) => panic!("expected execute plan"),
}
}
}

View File

@@ -94,17 +94,28 @@ impl State for Collect {
}
}
let inflight = self.inflight_procedures.len();
let succeeded = self.succeeded_procedures.len();
let failed = self.failed_procedures.len();
let unknown = self.unknown_procedures.len();
info!(
"Collected repartition group results for table_id: {}, inflight: {}, succeeded: {}, failed: {}, unknown: {}",
table_id, inflight, succeeded, failed, unknown
"Collected repartition group results for table_id: {}, succeeded: {}, failed: {}, unknown: {}",
table_id, succeeded, failed, unknown
);
if failed > 0 || unknown > 0 {
// TODO(weny): retry the failed or unknown procedures.
ctx.persistent_ctx
.failed_procedures
.extend(self.failed_procedures.iter());
ctx.persistent_ctx
.unknown_procedures
.extend(self.unknown_procedures.iter());
return crate::error::UnexpectedSnafu {
violated: format!(
"Repartition groups failed or became unknown, table_id: {}, failed: {}, unknown: {}",
table_id, failed, unknown
),
}
.fail();
}
if let Some(start_time) = ctx.volatile_ctx.dispatch_start_time.take() {
@@ -118,3 +129,139 @@ impl State for Collect {
self
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_meta::test_util::MockDatanodeManager;
use common_procedure::{
Context as ProcedureContext, ContextProvider, Error as ProcedureError, ProcedureId,
ProcedureState,
};
use common_procedure_test::MockContextProvider;
use tokio::sync::watch;
use super::*;
use crate::procedure::repartition::PersistentContext;
use crate::procedure::repartition::test_util::TestingEnv;
struct FailedProcedureContextProvider {
receiver: watch::Receiver<ProcedureState>,
inner: MockContextProvider,
}
#[async_trait::async_trait]
impl ContextProvider for FailedProcedureContextProvider {
async fn procedure_state(
&self,
procedure_id: ProcedureId,
) -> common_procedure::Result<Option<ProcedureState>> {
self.inner.procedure_state(procedure_id).await
}
async fn procedure_state_receiver(
&self,
_procedure_id: ProcedureId,
) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
Ok(Some(self.receiver.clone()))
}
async fn try_put_poison(
&self,
key: &common_procedure::PoisonKey,
procedure_id: ProcedureId,
) -> common_procedure::Result<()> {
self.inner.try_put_poison(key, procedure_id).await
}
async fn acquire_lock(
&self,
key: &common_procedure::StringKey,
) -> common_procedure::local::DynamicKeyLockGuard {
self.inner.acquire_lock(key).await
}
}
#[tokio::test]
async fn test_collect_returns_error_when_unknown_exists() {
let env = TestingEnv::new();
let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
let persistent_ctx = PersistentContext::new(
table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
1024,
None,
);
let mut ctx = crate::procedure::repartition::Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let mut state = Collect {
inflight_procedures: vec![],
succeeded_procedures: vec![],
failed_procedures: vec![],
unknown_procedures: vec![ProcedureMeta {
plan_index: 0,
group_id: uuid::Uuid::new_v4(),
procedure_id: common_procedure::ProcedureId::random(),
}],
};
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_collect_returns_error_when_failed_exists() {
let env = TestingEnv::new();
let ddl_ctx = env.ddl_context(Arc::new(MockDatanodeManager::new(())));
let persistent_ctx = PersistentContext::new(
table::table_name::TableName::new("test_catalog", "test_schema", "test_table"),
1024,
None,
);
let mut ctx = crate::procedure::repartition::Context::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
);
let procedure_id = common_procedure::ProcedureId::random();
let (tx, rx) = watch::channel(ProcedureState::Running);
tx.send(ProcedureState::failed(Arc::new(ProcedureError::external(
MockError::new(StatusCode::Internal),
))))
.unwrap();
let procedure_ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(FailedProcedureContextProvider {
receiver: rx,
inner: MockContextProvider::default(),
}),
};
let mut state = Collect {
inflight_procedures: vec![ProcedureMeta {
plan_index: 0,
group_id: uuid::Uuid::new_v4(),
procedure_id,
}],
succeeded_procedures: vec![],
failed_procedures: vec![],
unknown_procedures: vec![],
};
let err = state.next(&mut ctx, &procedure_ctx).await.unwrap_err();
assert_eq!(state.failed_procedures.len(), 1);
assert_eq!(state.unknown_procedures.len(), 0);
assert!(!err.is_retryable());
}
}

View File

@@ -88,7 +88,8 @@ impl State for DeallocateRegion {
&ctx.persistent_ctx.schema_name,
&ctx.persistent_ctx.table_name,
);
// Deallocates the regions on datanodes.
// Memory guards are not required here,
// because the table metadata still contains routes for the deallocating regions.
Self::deallocate_regions(
&ctx.node_manager,
&ctx.leader_region_registry,
@@ -116,7 +117,7 @@ impl State for DeallocateRegion {
}
impl DeallocateRegion {
async fn deallocate_regions(
pub(crate) async fn deallocate_regions(
node_manager: &NodeManagerRef,
leader_region_registry: &LeaderRegionRegistryRef,
table: TableName,
@@ -141,7 +142,7 @@ impl DeallocateRegion {
Ok(())
}
fn filter_deallocatable_region_routes(
pub(crate) fn filter_deallocatable_region_routes(
table_id: TableId,
region_routes: &[RegionRoute],
pending_deallocate_region_ids: &HashSet<RegionId>,
@@ -165,7 +166,7 @@ impl DeallocateRegion {
.collect::<Vec<_>>()
}
fn generate_region_routes(
pub(crate) fn generate_region_routes(
region_routes: &[RegionRoute],
pending_deallocate_region_ids: &HashSet<RegionId>,
) -> Vec<RegionRoute> {
@@ -181,12 +182,21 @@ impl DeallocateRegion {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use common_meta::ddl::test_util::datanode_handler::RetryErrorDatanodeHandler;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::MockDatanodeManager;
use store_api::storage::{RegionId, TableId};
use crate::error::Error;
use crate::procedure::repartition::State;
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
use crate::procedure::repartition::plan::RepartitionPlanEntry;
use crate::procedure::repartition::test_util::{
TestingEnv, current_parent_region_routes, new_parent_context,
};
fn test_region_routes(table_id: TableId) -> Vec<RegionRoute> {
vec![
@@ -238,4 +248,36 @@ mod tests {
assert_eq!(new_region_routes.len(), 1);
assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 2));
}
#[tokio::test]
async fn test_next_retryable_when_deallocate_regions_retry_later() {
let env = TestingEnv::new();
let table_id = 1024;
let original_routes = test_region_routes(table_id);
env.create_physical_table_metadata(table_id, original_routes.clone())
.await;
let node_manager = Arc::new(MockDatanodeManager::new(RetryErrorDatanodeHandler));
let mut ctx = new_parent_context(&env, node_manager, table_id);
ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
group_id: uuid::Uuid::new_v4(),
source_regions: vec![],
target_regions: vec![],
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![RegionId::new(table_id, 1)],
transition_map: vec![],
}];
let mut state = DeallocateRegion;
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(matches!(err, Error::DeallocateRegions { .. }));
assert!(err.is_retryable());
assert_eq!(current_parent_region_routes(&ctx).await, original_routes);
}
}

View File

@@ -31,7 +31,7 @@ use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
fn build_region_mapping(
pub(crate) fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
transition_map: &[Vec<usize>],
@@ -106,7 +106,11 @@ impl State for Dispatch {
Ok((
Box::new(Collect::new(procedure_metas)),
Status::suspended(procedures, true),
// The state is not persisted after sub-procedures are spawned.
// If metasrv restarts before all sub-procedures complete,
// it restores from the `Dispatch` state and re-dispatches them.
// This is safe because the sub-procedures are idempotent.
Status::suspended(procedures, false),
))
}

View File

@@ -41,14 +41,18 @@ use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::utils::get_datanode_table_value;
use crate::procedure::repartition::{self};
@@ -192,6 +196,62 @@ impl RepartitionGroupProcedure {
Ok(Self { state, context })
}
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
if !self.should_rollback_metadata() {
return Ok(());
}
let table_lock =
common_meta::lock_key::TableLock::Write(self.context.persistent_ctx.table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
UpdateMetadata::RollbackStaging
.rollback_staging_regions(&mut self.context)
.await?;
if let Err(err) = self.context.invalidate_table_cache().await {
warn!(
err;
"Failed to broadcast the invalidate table cache message during repartition group rollback"
);
}
Ok(())
}
/// Returns whether group rollback should revert staging metadata.
///
/// This uses an "after metadata apply, before exit staging" semantic.
/// Once execution reaches `UpdateMetadata::ApplyStaging` or any later staging state,
/// rollback must restore table-route metadata back to the pre-apply view.
///
/// State flow:
/// `RepartitionStart -> SyncRegion -> UpdateMetadata::ApplyStaging -> EnterStagingRegion`
/// ` -> RemapManifest -> ApplyStagingManifest -> UpdateMetadata::ExitStaging -> RepartitionEnd`
/// ` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^`
/// ` rollback staging metadata`
///
/// Notes:
/// - `RepartitionStart` / `SyncRegion`: no-op, metadata has not been staged yet.
/// - `UpdateMetadata::ApplyStaging` / `EnterStagingRegion` / `RemapManifest` /
/// `ApplyStagingManifest` / `UpdateMetadata::RollbackStaging`: rollback-active.
/// - `UpdateMetadata::ExitStaging` / `RepartitionEnd`: excluded, because metadata has
/// already moved into the post-commit exit path.
fn should_rollback_metadata(&self) -> bool {
self.state.as_any().is::<EnterStagingRegion>()
|| self.state.as_any().is::<RemapManifest>()
|| self.state.as_any().is::<ApplyStagingManifest>()
|| self
.state
.as_any()
.downcast_ref::<UpdateMetadata>()
.is_some_and(|state| {
matches!(
state,
UpdateMetadata::ApplyStaging | UpdateMetadata::RollbackStaging
)
})
}
}
#[async_trait::async_trait]
@@ -200,6 +260,12 @@ impl Procedure for RepartitionGroupProcedure {
Self::TYPE_NAME
}
async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
self.rollback_inner(ctx)
.await
.map_err(ProcedureError::external)
}
#[tracing::instrument(skip_all, fields(
state = %self.state.name(),
table_id = self.context.persistent_ctx.table_id,
@@ -238,7 +304,7 @@ impl Procedure for RepartitionGroupProcedure {
}
fn rollback_supported(&self) -> bool {
false
true
}
fn dump(&self) -> ProcedureResult<String> {
@@ -304,7 +370,7 @@ impl Context {
pub struct GroupPrepareResult {
/// The validated source region routes.
pub source_routes: Vec<RegionRoute>,
/// The validated target region routes.
/// Validated target region routes used for metadata rollback (logical rollback).
pub target_routes: Vec<RegionRoute>,
/// The primary source region id (first source region), used for retrieving region options.
pub central_region: RegionId,
@@ -599,12 +665,149 @@ pub(crate) trait State: Sync + Send + Debug {
mod tests {
use std::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use partition::expr::PartitionExpr;
use store_api::storage::RegionId;
use super::{
Context, PersistentContext, RepartitionGroupProcedure, RepartitionStart, State,
region_routes,
};
use crate::error::Error;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
use crate::procedure::repartition::dispatch::build_region_mapping;
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::repartition_start::RepartitionStart as GroupRepartitionStart;
use crate::procedure::repartition::group::sync_region::SyncRegion;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan;
use crate::procedure::repartition::repartition_start::RepartitionStart as ParentRepartitionStart;
use crate::procedure::repartition::test_util::{
TestingEnv, new_persistent_context, range_expr,
};
struct GroupRollbackFixture {
context: Context,
original_region_routes: Vec<RegionRoute>,
next_state: Option<Box<dyn State>>,
}
async fn new_group_rollback_fixture(
original_region_routes: Vec<RegionRoute>,
from_exprs: Vec<PartitionExpr>,
to_exprs: Vec<PartitionExpr>,
sync_region: bool,
) -> GroupRollbackFixture {
let env = TestingEnv::new();
let procedure_ctx = TestingEnv::procedure_context();
let table_id = 1024;
let mut next_region_number = 10;
env.create_physical_table_metadata(table_id, original_region_routes.clone())
.await;
let (_, physical_route) = env
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.unwrap();
let allocation_plans =
ParentRepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap();
assert_eq!(allocation_plans.len(), 1);
let repartition_plan = plan::convert_allocation_plan_to_repartition_plan(
table_id,
&mut next_region_number,
&allocation_plans[0],
);
let region_mapping = build_region_mapping(
&repartition_plan.source_regions,
&repartition_plan.target_regions,
&repartition_plan.transition_map,
);
let persistent_context = PersistentContext::new(
repartition_plan.group_id,
table_id,
"test_catalog".to_string(),
"test_schema".to_string(),
repartition_plan.source_regions,
repartition_plan.target_regions,
region_mapping,
sync_region,
repartition_plan.allocated_region_ids,
repartition_plan.pending_deallocate_region_ids,
Duration::from_secs(120),
);
let mut context = env.create_context(persistent_context);
let (next_state, _) = GroupRepartitionStart
.next(&mut context, &procedure_ctx)
.await
.unwrap();
GroupRollbackFixture {
context,
original_region_routes,
next_state: Some(next_state),
}
}
async fn new_split_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture {
new_group_rollback_fixture(
vec![
new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))),
new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))),
new_region_route(RegionId::new(1024, 10), None),
],
vec![range_expr("x", 0, 100)],
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
sync_region,
)
.await
}
async fn new_merge_group_rollback_fixture(sync_region: bool) -> GroupRollbackFixture {
new_group_rollback_fixture(
vec![
new_region_route(RegionId::new(1024, 1), Some(range_expr("x", 0, 100))),
new_region_route(RegionId::new(1024, 2), Some(range_expr("x", 100, 200))),
new_region_route(RegionId::new(1024, 3), Some(range_expr("x", 200, 300))),
],
vec![range_expr("x", 0, 100), range_expr("x", 100, 200)],
vec![range_expr("x", 0, 200)],
sync_region,
)
.await
}
async fn stage_metadata(context: &mut Context) {
UpdateMetadata::ApplyStaging
.apply_staging_regions(context)
.await
.unwrap();
}
fn new_region_route(region_id: RegionId, partition_expr: Option<PartitionExpr>) -> RegionRoute {
RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr
.map(|expr| expr.as_json_str().unwrap())
.unwrap_or_default(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}
}
#[tokio::test]
async fn test_get_table_route_value_not_found_error() {
@@ -653,4 +856,198 @@ mod tests {
let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_group_rollback_supported() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let procedure = RepartitionGroupProcedure {
state: Box::new(RepartitionStart),
context: env.create_context(persistent_context),
};
assert!(procedure.rollback_supported());
}
#[tokio::test]
async fn test_group_rollback_is_noop_before_apply_staging() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context.clone());
let mut procedure = RepartitionGroupProcedure {
state: Box::new(RepartitionStart),
context: ctx,
};
let provider = Arc::new(MockContextProvider::new(Default::default()));
let procedure_ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider,
};
procedure.rollback(&procedure_ctx).await.unwrap();
assert!(procedure.state.as_any().is::<RepartitionStart>());
assert_eq!(procedure.context.persistent_ctx, persistent_context);
}
async fn assert_noop_rollback(
fixture: GroupRollbackFixture,
state: Box<dyn State>,
assert_state: impl FnOnce(&dyn State),
) {
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
let mut procedure = RepartitionGroupProcedure {
state,
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
assert_state(&*procedure.state);
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let region_routes = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap();
assert_eq!(region_routes.clone(), original_region_routes);
}
async fn assert_metadata_rollback_restores_table_route(
mut fixture: GroupRollbackFixture,
state: Box<dyn State>,
) {
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
stage_metadata(&mut fixture.context).await;
let mut procedure = RepartitionGroupProcedure {
state,
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let region_routes = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap();
assert_eq!(region_routes.clone(), original_region_routes);
}
#[tokio::test]
async fn test_group_rollback_is_noop_in_sync_region() {
let mut fixture = new_split_group_rollback_fixture(true).await;
assert!(
fixture
.next_state
.as_ref()
.unwrap()
.as_any()
.is::<SyncRegion>()
);
let state = fixture.next_state.take().unwrap();
assert_noop_rollback(fixture, state, |state| {
assert!(state.as_any().is::<SyncRegion>());
})
.await;
}
#[tokio::test]
async fn test_group_rollback_is_noop_in_exit_staging() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_noop_rollback(fixture, Box::new(UpdateMetadata::ExitStaging), |state| {
assert!(state.as_any().is::<UpdateMetadata>());
assert!(matches!(
state.as_any().downcast_ref::<UpdateMetadata>(),
Some(UpdateMetadata::ExitStaging)
));
})
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_apply_staging() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(
fixture,
Box::new(UpdateMetadata::ApplyStaging),
)
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_enter_staging_region() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(EnterStagingRegion)).await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_remap_manifest() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(RemapManifest)).await;
}
#[tokio::test]
async fn test_group_rollback_restores_split_routes_from_apply_staging_manifest() {
let fixture = new_split_group_rollback_fixture(false).await;
assert_metadata_rollback_restores_table_route(fixture, Box::new(ApplyStagingManifest))
.await;
}
#[tokio::test]
async fn test_group_rollback_restores_merge_routes_and_is_idempotent() {
let mut fixture = new_merge_group_rollback_fixture(false).await;
let original_region_routes = fixture.original_region_routes.clone();
let procedure_ctx = TestingEnv::procedure_context();
stage_metadata(&mut fixture.context).await;
let mut procedure = RepartitionGroupProcedure {
state: Box::new(UpdateMetadata::ApplyStaging),
context: fixture.context,
};
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let once = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap()
.clone();
procedure.rollback(&procedure_ctx).await.unwrap();
let table_route_value = procedure
.context
.get_table_route_value()
.await
.unwrap()
.into_inner();
let twice = region_routes(
procedure.context.persistent_ctx.table_id,
&table_route_value,
)
.unwrap()
.clone();
assert_eq!(once, original_region_routes);
assert_eq!(once, twice);
}
}

View File

@@ -332,7 +332,14 @@ impl ApplyStagingManifest {
);
Ok(())
}
},
Err(error::Error::MailboxChannelClosed {..})=> error::RetryLaterSnafu {
reason: format!(
"Mailbox closed when sending apply staging manifests to datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}.fail()?,
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for apply staging manifests on datanode {:?}, elapsed: {:?}",

View File

@@ -315,7 +315,14 @@ impl EnterStagingRegion {
);
Ok(())
}
},
Err(error::Error::MailboxChannelClosed {..})=> error::RetryLaterSnafu {
reason: format!(
"Mailbox closed when sending enter staging regions to datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}.fail()?,
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for enter staging regions on datanode {:?}, elapsed: {:?}",

View File

@@ -184,6 +184,14 @@ impl RemapManifest {
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
}
Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
reason: format!(
"Mailbox closed when sending remap manifests to datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",

View File

@@ -273,6 +273,14 @@ impl SyncRegion {
}
Ok(())
}
Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
reason: format!(
"Mailbox closed when sending sync region to datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",

View File

@@ -30,7 +30,7 @@ impl UpdateMetadata {
/// Abort:
/// - Target region not found.
/// - Source region not found.
fn apply_staging_region_routes(
pub(crate) fn apply_staging_region_routes(
group_id: GroupId,
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
@@ -50,10 +50,12 @@ impl UpdateMetadata {
region_id: target.region_id,
},
)?;
// Set the new partition expression for the target region route.
region_route.region.partition_expr = target
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
// Set leader staging state and write route policy for the target region route.
region_route.set_leader_staging();
region_route.clear_ignore_all_writes();
}
@@ -65,6 +67,7 @@ impl UpdateMetadata {
region_id: source.region_id,
},
)?;
// Set leader staging state for the source region route.
region_route.set_leader_staging();
if pending_deallocate_region_ids.contains(&source.region_id) {
// When a region is pending deallocation, it should ignore all writes.

View File

@@ -18,10 +18,12 @@ use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_telemetry::{error, info};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{Context, GroupId, region_routes};
use crate::procedure::repartition::plan::RegionDescriptor;
impl UpdateMetadata {
/// Rolls back the staging regions.
@@ -31,8 +33,9 @@ impl UpdateMetadata {
/// - Target region not found.
fn rollback_staging_region_routes(
group_id: GroupId,
source_routes: &[RegionRoute],
target_routes: &[RegionRoute],
sources: &[RegionDescriptor],
original_target_routes: &[RegionRoute],
pending_deallocate_region_ids: &[RegionId],
current_region_routes: &[RegionRoute],
) -> Result<Vec<RegionRoute>> {
let mut region_routes = current_region_routes.to_vec();
@@ -40,26 +43,35 @@ impl UpdateMetadata {
.iter_mut()
.map(|route| (route.region.id, route))
.collect::<HashMap<_, _>>();
for source in source_routes {
let region_route = region_routes_map.get_mut(&source.region.id).context(
for source in sources {
let region_route = region_routes_map.get_mut(&source.region_id).context(
error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: source.region.id,
region_id: source.region_id,
},
)?;
region_route.region.partition_expr = source.region.partition_expr.clone();
// Clean leader staging state for source regions.
region_route.clear_leader_staging();
region_route.clear_ignore_all_writes();
if pending_deallocate_region_ids.contains(&source.region_id) {
// Clean ignore all writes state for source regions if it's pending to be deallocated,
// which means the source region is merged into the target region.
region_route.clear_ignore_all_writes();
}
}
for target in target_routes {
for target in original_target_routes {
let region_route = region_routes_map.get_mut(&target.region.id).context(
error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: target.region.id,
},
)?;
// Revert the partition expression and write route policy to the original value for the target region.
region_route.region.partition_expr = target.region.partition_expr.clone();
region_route.write_route_policy = target.write_route_policy;
// Clean leader staging state for target regions.
region_route.clear_leader_staging();
}
@@ -83,8 +95,9 @@ impl UpdateMetadata {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let new_region_routes = Self::rollback_staging_region_routes(
group_id,
&prepare_result.source_routes,
&ctx.persistent_ctx.sources,
&prepare_result.target_routes,
&ctx.persistent_ctx.pending_deallocate_region_ids,
region_routes,
)?;
@@ -113,87 +126,176 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
fn new_region_route(
region_id: RegionId,
partition_expr: &str,
leader_state: Option<LeaderState>,
ignore_all_writes: bool,
) -> RegionRoute {
let mut route = RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr.to_string(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state,
..Default::default()
};
if ignore_all_writes {
route.set_ignore_all_writes();
}
route
}
fn original_target_routes(
region_routes: &[RegionRoute],
targets: &[RegionDescriptor],
) -> Vec<RegionRoute> {
let target_ids = targets
.iter()
.map(|target| target.region_id)
.collect::<HashSet<_>>();
region_routes
.iter()
.filter(|route| target_ids.contains(&route.region.id))
.cloned()
.collect()
}
#[test]
fn test_rollback_staging_region_routes() {
fn test_rollback_staging_region_routes_split_case() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let region_routes = vec![
{
let mut route = RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
};
route.set_ignore_all_writes();
route
let original_region_routes = vec![
new_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_region_route(RegionId::new(table_id, 3), "", None, false),
];
let sources = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
}];
let targets = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 50),
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Staging),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 3),
partition_expr: String::new(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
leader_state: Some(LeaderState::Downgrading),
..Default::default()
RegionDescriptor {
region_id: RegionId::new(table_id, 3),
partition_expr: range_expr("x", 50, 100),
},
];
let source_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let target_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 0, 20).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let new_region_routes = UpdateMetadata::rollback_staging_region_routes(
let applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&source_routes,
&target_routes,
&region_routes,
&sources,
&targets,
&[],
&original_region_routes,
)
.unwrap();
assert!(!new_region_routes[0].is_leader_staging());
assert!(!new_region_routes[0].is_ignore_all_writes());
assert_eq!(
new_region_routes[0].region.partition_expr,
range_expr("x", 0, 20).as_json_str().unwrap(),
);
assert!(!new_region_routes[1].is_leader_staging());
assert!(!new_region_routes[1].is_ignore_all_writes());
assert!(new_region_routes[2].is_leader_downgrading());
let target_routes = original_target_routes(&original_region_routes, &targets);
let new_region_routes = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[],
&applied_region_routes,
)
.unwrap();
assert_eq!(new_region_routes, original_region_routes);
}
#[test]
fn test_rollback_staging_region_routes_merge_case_is_idempotent() {
let group_id = Uuid::new_v4();
let table_id = 1024;
let original_region_routes = vec![
new_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 2),
&range_expr("x", 100, 200).as_json_str().unwrap(),
None,
false,
),
new_region_route(
RegionId::new(table_id, 3),
&range_expr("x", 200, 300).as_json_str().unwrap(),
None,
false,
),
];
let sources = vec![
RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 100),
},
RegionDescriptor {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 100, 200),
},
];
let targets = vec![RegionDescriptor {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 200),
}];
let target_routes = original_target_routes(&original_region_routes, &targets);
let applied_region_routes = UpdateMetadata::apply_staging_region_routes(
group_id,
&sources,
&targets,
&[RegionId::new(table_id, 2)],
&original_region_routes,
)
.unwrap();
let once = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[RegionId::new(table_id, 2)],
&applied_region_routes,
)
.unwrap();
let twice = UpdateMetadata::rollback_staging_region_routes(
group_id,
&sources,
&target_routes,
&[RegionId::new(table_id, 2)],
&once,
)
.unwrap();
assert_eq!(once, original_region_routes);
assert_eq!(once, twice);
}
}

View File

@@ -102,7 +102,7 @@ impl State for RepartitionStart {
}
impl RepartitionStart {
fn build_plan(
pub(crate) fn build_plan(
physical_route: &PhysicalTableRouteValue,
from_exprs: &[PartitionExpr],
to_exprs: &[PartitionExpr],

View File

@@ -16,22 +16,41 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_meta::ddl::DdlContext;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::{new_test_table_info, new_test_table_info_with_name};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceBuilder;
use common_meta::test_util::new_ddl_context_with_kv_backend;
use common_procedure::{
Context as ProcedureContext, ContextProvider, ProcedureId, ProcedureState, Status,
};
use common_procedure_test::MockContextProvider;
use common_wal::options::{KafkaWalOptions, WalOptions};
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::storage::TableId;
use store_api::storage::{RegionId, RegionNumber, TableId};
use table::table_name::TableName;
use tokio::sync::watch;
use uuid::Uuid;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::metasrv::MetasrvInfo;
use crate::procedure::repartition::group::{Context, PersistentContext, VolatileContext};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::{
Context as ParentContext, PersistentContext as ParentPersistentContext, RepartitionProcedure,
};
use crate::procedure::test_util::MailboxContext;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
pub kv_backend: KvBackendRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub mailbox_ctx: MailboxContext,
pub server_addr: String,
@@ -45,13 +64,14 @@ impl Default for TestingEnv {
impl TestingEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox_ctx = MailboxContext::new(mailbox_sequence);
Self {
kv_backend,
table_metadata_manager,
mailbox_ctx,
server_addr: "localhost".to_string(),
@@ -76,6 +96,65 @@ impl TestingEnv {
volatile_ctx: VolatileContext::default(),
}
}
pub fn procedure_context() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}
pub async fn create_physical_table_metadata(
&self,
table_id: TableId,
region_routes: Vec<RegionRoute>,
) {
self.create_physical_table_metadata_with_wal_options(
table_id,
region_routes,
HashMap::default(),
)
.await;
}
pub async fn create_physical_table_metadata_with_wal_options(
&self,
table_id: TableId,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) {
self.table_metadata_manager
.create_table_metadata(
new_test_table_info(table_id),
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();
}
pub async fn create_physical_table_metadata_for_repartition(
&self,
table_id: TableId,
region_routes: Vec<RegionRoute>,
region_wal_options: HashMap<RegionNumber, String>,
) {
let mut table_info = new_test_table_info_with_name(table_id, "test_table");
table_info.meta.column_ids = vec![0, 1, 2];
self.table_metadata_manager
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();
}
pub fn ddl_context(&self, node_manager: NodeManagerRef) -> DdlContext {
new_ddl_context_with_kv_backend(node_manager, self.kv_backend.clone())
}
}
pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
@@ -84,6 +163,18 @@ pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
.and(col(col_name).lt(Value::Int64(end)))
}
pub fn test_region_wal_options(region_numbers: &[RegionNumber]) -> HashMap<RegionNumber, String> {
let wal_options = serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: "test_topic".to_string(),
}))
.unwrap();
region_numbers
.iter()
.map(|region_number| (*region_number, wal_options.clone()))
.collect()
}
pub fn new_persistent_context(
table_id: TableId,
sources: Vec<RegionDescriptor>,
@@ -105,3 +196,110 @@ pub fn new_persistent_context(
timeout: Duration::from_secs(120),
}
}
pub fn test_region_route(region_id: RegionId, partition_expr: &str) -> RegionRoute {
RegionRoute {
region: Region {
id: region_id,
partition_expr: partition_expr.to_string(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}
}
pub async fn current_parent_region_routes(ctx: &ParentContext) -> Vec<RegionRoute> {
let table_route_value = ctx.get_table_route_value().await.unwrap().into_inner();
table_route_value.region_routes().unwrap().clone()
}
pub fn new_parent_context(
env: &TestingEnv,
node_manager: NodeManagerRef,
table_id: TableId,
) -> ParentContext {
let ddl_ctx = env.ddl_context(node_manager);
let persistent_ctx = ParentPersistentContext::new(
TableName::new("test_catalog", "test_schema", "test_table"),
table_id,
None,
);
ParentContext::new(
&ddl_ctx,
env.mailbox_ctx.mailbox().clone(),
env.server_addr.clone(),
persistent_ctx,
)
}
pub fn assert_parent_state<T: 'static>(procedure: &RepartitionProcedure) {
assert!(procedure.state.as_any().is::<T>());
}
pub fn extract_subprocedure_ids(status: Status) -> Vec<ProcedureId> {
let Status::Suspended { subprocedures, .. } = status else {
panic!("expected suspended status");
};
subprocedures
.into_iter()
.map(|procedure| procedure.id)
.collect()
}
pub fn procedure_state_receiver(state: ProcedureState) -> watch::Receiver<ProcedureState> {
let (tx, rx) = watch::channel(ProcedureState::Running);
tx.send(state).unwrap();
rx
}
pub fn procedure_context_with_receivers(
receivers: HashMap<ProcedureId, watch::Receiver<ProcedureState>>,
) -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(ProcedureStateReceiverProvider {
receivers,
inner: MockContextProvider::default(),
}),
}
}
struct ProcedureStateReceiverProvider {
receivers: HashMap<ProcedureId, watch::Receiver<ProcedureState>>,
inner: MockContextProvider,
}
#[async_trait::async_trait]
impl ContextProvider for ProcedureStateReceiverProvider {
async fn procedure_state(
&self,
procedure_id: ProcedureId,
) -> common_procedure::Result<Option<ProcedureState>> {
self.inner.procedure_state(procedure_id).await
}
async fn procedure_state_receiver(
&self,
procedure_id: ProcedureId,
) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
Ok(self.receivers.get(&procedure_id).cloned())
}
async fn try_put_poison(
&self,
key: &common_procedure::PoisonKey,
procedure_id: ProcedureId,
) -> common_procedure::Result<()> {
self.inner.try_put_poison(key, procedure_id).await
}
async fn acquire_lock(
&self,
key: &common_procedure::StringKey,
) -> common_procedure::local::DynamicKeyLockGuard {
self.inner.acquire_lock(key).await
}
}

View File

@@ -190,6 +190,23 @@ pub(crate) async fn flush_region(
operation: "Flush regions",
}
.fail(),
Err(error::Error::MailboxChannelClosed { .. }) => match error_strategy {
ErrorStrategy::Ignore => {
warn!(
"Failed to flush regions({:?}), the datanode({}) is unreachable(MailboxChannelClosed). Skip flush operation.",
region_ids, datanode
);
Ok(())
}
ErrorStrategy::Retry => error::RetryLaterSnafu {
reason: format!(
"Mailbox closed when sending flush region to datanode {:?}, elapsed: {:?}",
datanode,
now.elapsed()
),
}
.fail()?,
},
Err(err) => Err(err),
}
}