feat: introduce batch region migration (#7176)

* feat: introduce batch region migration

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: try fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix get table route

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: avoid cloning vec

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-14 16:15:18 +08:00
committed by GitHub
parent c1e762960a
commit d5f52013ec
18 changed files with 1213 additions and 897 deletions

View File

@@ -164,6 +164,25 @@ impl DatanodeTableManager {
.transpose()
}
pub async fn batch_get(
&self,
keys: &[DatanodeTableKey],
) -> Result<HashMap<DatanodeTableKey, DatanodeTableValue>> {
let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect());
let resp = self.kv_backend.batch_get(req).await?;
let values = resp
.kvs
.into_iter()
.map(|kv| {
Ok((
DatanodeTableKey::from_bytes(&kv.key)?,
DatanodeTableValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
Ok(values)
}
pub fn tables(
&self,
datanode_id: DatanodeId,

View File

@@ -661,13 +661,32 @@ impl TableRouteStorage {
/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let mut table_routes = self.batch_get_inner(table_ids).await?;
self.remap_routes_addresses(&mut table_routes).await?;
let raw_table_routes = self.batch_get_inner(table_ids).await?;
Ok(table_routes)
Ok(raw_table_routes
.into_iter()
.map(|v| v.map(|x| x.inner))
.collect())
}
async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
/// Returns batch of [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`].
///
/// The return value is a vector of [`Option<DeserializedValueWithBytes<TableRouteValue>>`].
/// Note: This method remaps the addresses of the table routes, but does not update their raw byte representations.
pub async fn batch_get_with_raw_bytes(
&self,
table_ids: &[TableId],
) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
let mut raw_table_routes = self.batch_get_inner(table_ids).await?;
self.remap_routes_addresses(&mut raw_table_routes).await?;
Ok(raw_table_routes)
}
async fn batch_get_inner(
&self,
table_ids: &[TableId],
) -> Result<Vec<Option<DeserializedValueWithBytes<TableRouteValue>>>> {
let keys = table_ids
.iter()
.map(|id| TableRouteKey::new(*id).to_bytes())
@@ -685,7 +704,7 @@ impl TableRouteStorage {
keys.into_iter()
.map(|key| {
if let Some(value) = kvs.get(&key) {
Ok(Some(TableRouteValue::try_from_raw_value(value)?))
Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?))
} else {
Ok(None)
}
@@ -695,14 +714,14 @@ impl TableRouteStorage {
async fn remap_routes_addresses(
&self,
table_routes: &mut [Option<TableRouteValue>],
table_routes: &mut [Option<DeserializedValueWithBytes<TableRouteValue>>],
) -> Result<()> {
let keys = table_routes
.iter()
.flat_map(|table_route| {
table_route
.as_ref()
.map(extract_address_keys)
.map(|x| extract_address_keys(&x.inner))
.unwrap_or_default()
})
.collect::<HashSet<_>>()

View File

@@ -33,7 +33,7 @@ use crate::rpc::store::{
// The TopicRegionKey is a key for the topic-region mapping in the kvbackend.
// The layout of the key is `__topic_region/{topic_name}/{region_id}`.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicRegionKey<'a> {
pub region_id: RegionId,
pub topic: &'a str,

View File

@@ -62,10 +62,12 @@ pub(crate) struct RegionMigrationEvent {
impl RegionMigrationEvent {
pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
// FIXME(weny): handle multiple region ids.
let region_id = ctx.region_ids[0];
Self {
region_id: ctx.region_id,
table_id: ctx.region_id.table_id(),
region_number: ctx.region_id.region_number(),
region_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
trigger_reason: ctx.trigger_reason,
src_node_id: ctx.from_peer.id,
src_peer_addr: ctx.from_peer.addr.clone(),

View File

@@ -26,6 +26,7 @@ pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::time::Duration;
@@ -36,7 +37,6 @@ use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::RegionFailureDetectorControllerRef;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
@@ -56,9 +56,9 @@ pub use manager::{
RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
RegionMigrationTriggerReason,
};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, TableId};
use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
@@ -73,6 +73,25 @@ use crate::service::mailbox::MailboxRef;
/// The default timeout for region migration.
pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum SingleOrMultiple<T> {
Single(T),
Multiple(Vec<T>),
}
fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
Ok(match helper {
SingleOrMultiple::Single(x) => vec![x],
SingleOrMultiple::Multiple(xs) => xs,
})
}
/// It's shared in each step and available even after recovering.
///
/// It will only be updated/stored after the Red node has succeeded.
@@ -89,7 +108,8 @@ pub struct PersistentContext {
/// The [Peer] of migration destination.
pub(crate) to_peer: Peer,
/// The [RegionId] of migration region.
pub(crate) region_id: RegionId,
#[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
pub(crate) region_ids: Vec<RegionId>,
/// The timeout for downgrading leader region and upgrading candidate region operations.
#[serde(with = "humantime_serde", default = "default_timeout")]
pub(crate) timeout: Duration,
@@ -104,14 +124,42 @@ fn default_timeout() -> Duration {
impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let region_id = self.region_id;
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
RegionLock::Write(region_id).into(),
];
let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2);
lock_keys.push(CatalogLock::Read(&self.catalog).into());
lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into());
// Sort the region ids to ensure the same order of region ids.
let mut region_ids = self.region_ids.clone();
region_ids.sort_unstable();
for region_id in region_ids {
lock_keys.push(RegionLock::Write(region_id).into());
}
lock_keys
}
lock_key
/// Returns the table ids of the regions.
///
/// The return value is a set of table ids.
pub fn region_table_ids(&self) -> Vec<TableId> {
self.region_ids
.iter()
.map(|region_id| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
/// Returns the table regions map.
///
/// The key is the table id, the value is the region ids of the table.
pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
let mut table_regions = HashMap::new();
for region_id in &self.region_ids {
table_regions
.entry(region_id.table_id())
.or_insert_with(Vec::new)
.push(*region_id);
}
table_regions
}
}
@@ -227,23 +275,18 @@ pub struct VolatileContext {
/// `opening_region_guard` will be set after the
/// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step.
///
/// `opening_region_guard` should be consumed after
/// `opening_region_guards` should be consumed after
/// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region
/// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue).
opening_region_guard: Option<OperatingRegionGuard>,
/// `datanode_table` is stored via previous steps for future use.
from_peer_datanode_table: Option<DatanodeTableValue>,
/// `table_info` is stored via previous steps for future use.
///
/// `table_info` should remain unchanged during the procedure;
/// no other DDL procedure executed concurrently for the current table.
table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
opening_region_guards: Vec<OperatingRegionGuard>,
/// The deadline of leader region lease.
leader_region_lease_deadline: Option<Instant>,
/// The last_entry_id of leader region.
leader_region_last_entry_id: Option<u64>,
/// The last_entry_id of leader metadata region (Only used for metric engine).
leader_region_metadata_last_entry_id: Option<u64>,
/// The datanode table values.
from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
/// The last_entry_ids of leader regions.
leader_region_last_entry_ids: HashMap<RegionId, u64>,
/// The last_entry_ids of leader metadata regions (Only used for metric engine).
leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
/// Metrics of region migration.
metrics: Metrics,
}
@@ -262,13 +305,15 @@ impl VolatileContext {
}
/// Sets the `leader_region_last_entry_id`.
pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
self.leader_region_last_entry_id = Some(last_entry_id)
pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
self.leader_region_last_entry_ids
.insert(region_id, last_entry_id);
}
/// Sets the `leader_region_metadata_last_entry_id`.
pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
self.leader_region_metadata_last_entry_id = Some(last_entry_id);
pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
self.leader_region_metadata_last_entry_ids
.insert(region_id, last_entry_id);
}
}
@@ -317,7 +362,7 @@ impl DefaultContextFactory {
impl ContextFactory for DefaultContextFactory {
fn new_context(self, persistent_ctx: PersistentContext) -> Context {
Context {
persistent_ctx: Arc::new(persistent_ctx),
persistent_ctx,
volatile_ctx: self.volatile_ctx,
in_memory: self.in_memory_key,
table_metadata_manager: self.table_metadata_manager,
@@ -332,7 +377,7 @@ impl ContextFactory for DefaultContextFactory {
/// The context of procedure execution.
pub struct Context {
persistent_ctx: Arc<PersistentContext>,
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
in_memory: KvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
@@ -391,6 +436,47 @@ impl Context {
&self.server_addr
}
/// Returns the table ids of the regions.
pub fn region_table_ids(&self) -> Vec<TableId> {
self.persistent_ctx
.region_ids
.iter()
.map(|region_id| region_id.table_id())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
/// Returns the `table_routes` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_values(
&self,
) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
let table_ids = self.persistent_ctx.region_table_ids();
let table_routes = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.batch_get_with_raw_bytes(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get table routes: {table_ids:?}"),
})?;
let table_routes = table_ids
.into_iter()
.zip(table_routes)
.filter_map(|(table_id, table_route)| {
table_route.map(|table_route| (table_id, table_route))
})
.collect::<HashMap<_, _>>();
Ok(table_routes)
}
/// Returns the `table_route` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
@@ -398,9 +484,9 @@ impl Context {
/// - Failed to retrieve the metadata of table.
pub async fn get_table_route_value(
&self,
table_id: TableId,
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
let table_id = self.persistent_ctx.region_id.table_id();
let table_route = self
let table_route_value = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
@@ -409,11 +495,76 @@ impl Context {
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableRoute: {table_id}"),
reason: format!("Failed to get table routes: {table_id:}"),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
Ok(table_route_value)
}
Ok(table_route)
/// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
pub async fn get_from_peer_datanode_table_values(
&mut self,
) -> Result<&HashMap<TableId, DatanodeTableValue>> {
let from_peer_datanode_table_values =
&mut self.volatile_ctx.from_peer_datanode_table_values;
if from_peer_datanode_table_values.is_none() {
let table_ids = self.persistent_ctx.region_table_ids();
let datanode_table_keys = table_ids
.iter()
.map(|table_id| DatanodeTableKey {
datanode_id: self.persistent_ctx.from_peer.id,
table_id: *table_id,
})
.collect::<Vec<_>>();
let datanode_table_values = self
.table_metadata_manager
.datanode_table_manager()
.batch_get(&datanode_table_keys)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
})?
.into_iter()
.map(|(k, v)| (k.table_id, v))
.collect();
*from_peer_datanode_table_values = Some(datanode_table_values);
}
Ok(from_peer_datanode_table_values.as_ref().unwrap())
}
/// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of datanode table.
pub async fn get_from_peer_datanode_table_value(
&self,
table_id: TableId,
) -> Result<DatanodeTableValue> {
let datanode_table_value = self
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey {
datanode_id: self.persistent_ctx.from_peer.id,
table_id,
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: {table_id}"),
})?
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id: self.persistent_ctx.from_peer.id,
})?;
Ok(datanode_table_value)
}
/// Notifies the RegionSupervisor to register failure detectors of failed region.
@@ -422,11 +573,18 @@ impl Context {
/// Now, we need to register the failure detector for the failed region again.
pub async fn register_failure_detectors(&self) {
let datanode_id = self.persistent_ctx.from_peer.id;
let region_id = self.persistent_ctx.region_id;
let region_ids = &self.persistent_ctx.region_ids;
let detecting_regions = region_ids
.iter()
.map(|region_id| (datanode_id, *region_id))
.collect::<Vec<_>>();
self.region_failure_detector_controller
.register_failure_detectors(vec![(datanode_id, region_id)])
.register_failure_detectors(detecting_regions)
.await;
info!(
"Registered failure detectors after migration failures for datanode {}, regions {:?}",
datanode_id, region_ids
);
}
/// Notifies the RegionSupervisor to deregister failure detectors.
@@ -435,10 +593,14 @@ impl Context {
/// We need to deregister the failure detectors for the original region if the procedure is finished.
pub async fn deregister_failure_detectors(&self) {
let datanode_id = self.persistent_ctx.from_peer.id;
let region_id = self.persistent_ctx.region_id;
let region_ids = &self.persistent_ctx.region_ids;
let detecting_regions = region_ids
.iter()
.map(|region_id| (datanode_id, *region_id))
.collect::<Vec<_>>();
self.region_failure_detector_controller
.deregister_failure_detectors(vec![(datanode_id, region_id)])
.deregister_failure_detectors(detecting_regions)
.await;
}
@@ -448,112 +610,52 @@ impl Context {
/// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
pub async fn deregister_failure_detectors_for_candidate_region(&self) {
let to_peer_id = self.persistent_ctx.to_peer.id;
let region_id = self.persistent_ctx.region_id;
let region_ids = &self.persistent_ctx.region_ids;
let detecting_regions = region_ids
.iter()
.map(|region_id| (to_peer_id, *region_id))
.collect::<Vec<_>>();
self.region_failure_detector_controller
.deregister_failure_detectors(vec![(to_peer_id, region_id)])
.deregister_failure_detectors(detecting_regions)
.await;
}
/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
pub async fn get_table_info_value(
&mut self,
) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
let table_info_value = &mut self.volatile_ctx.table_info;
if table_info_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let table_info = self
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get TableInfo: {table_id}"),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?;
*table_info_value = Some(table_info);
}
Ok(table_info_value.as_ref().unwrap())
}
/// Returns the `table_info` of [VolatileContext] if any.
/// Otherwise, returns the value retrieved from remote.
///
/// Retry:
/// - Failed to retrieve the metadata of datanode.
pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> {
let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table;
if datanode_value.is_none() {
let table_id = self.persistent_ctx.region_id.table_id();
let datanode_id = self.persistent_ctx.from_peer.id;
let datanode_table = self
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey {
datanode_id,
table_id,
})
.await
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
})?
.context(error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id,
})?;
*datanode_value = Some(datanode_table);
}
Ok(datanode_value.as_ref().unwrap())
}
/// Fetches the replay checkpoint for the given topic.
pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result<Option<ReplayCheckpoint>> {
let region_id = self.region_id();
let topic_region_key = TopicRegionKey::new(region_id, topic);
let value = self
/// Fetches the replay checkpoints for the given topic region keys.
pub async fn get_replay_checkpoints(
&self,
topic_region_keys: Vec<TopicRegionKey<'_>>,
) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
let topic_region_values = self
.table_metadata_manager
.topic_region_manager()
.get(topic_region_key)
.batch_get(topic_region_keys)
.await
.context(error::TableMetadataManagerSnafu)?;
Ok(value.and_then(|value| value.checkpoint))
}
let replay_checkpoints = topic_region_values
.into_iter()
.flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
.collect::<HashMap<_, _>>();
/// Returns the [RegionId].
pub fn region_id(&self) -> RegionId {
self.persistent_ctx.region_id
Ok(replay_checkpoints)
}
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let table_ids = self.region_table_ids();
let mut cache_idents = Vec::with_capacity(table_ids.len());
for table_id in &table_ids {
cache_idents.push(CacheIdent::TableId(*table_id));
}
// ignore the result
let ctx = common_meta::cache_invalidator::Context::default();
let _ = self
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
Ok(())
}
/// Returns the [PersistentContext] of the procedure.
pub fn persistent_ctx(&self) -> Arc<PersistentContext> {
pub fn persistent_ctx(&self) -> PersistentContext {
self.persistent_ctx.clone()
}
}
@@ -595,7 +697,7 @@ pub struct RegionMigrationData<'a> {
pub(crate) struct RegionMigrationProcedure {
state: Box<dyn State>,
context: Context,
_guard: Option<RegionMigrationProcedureGuard>,
_guards: Vec<RegionMigrationProcedureGuard>,
}
impl RegionMigrationProcedure {
@@ -604,22 +706,22 @@ impl RegionMigrationProcedure {
pub fn new(
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
guard: Option<RegionMigrationProcedureGuard>,
guards: Vec<RegionMigrationProcedureGuard>,
) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context_factory, guard)
Self::new_inner(state, persistent_context, context_factory, guards)
}
fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
guard: Option<RegionMigrationProcedureGuard>,
guards: Vec<RegionMigrationProcedureGuard>,
) -> Self {
Self {
state,
context: context_factory.new_context(persistent_context),
_guard: guard,
_guards: guards,
}
}
@@ -632,20 +734,26 @@ impl RegionMigrationProcedure {
persistent_ctx,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let guards = persistent_ctx
.region_ids
.iter()
.flat_map(|region_id| {
tracker.insert_running_procedure(&RegionMigrationProcedureTask {
region_id: *region_id,
from_peer: persistent_ctx.from_peer.clone(),
to_peer: persistent_ctx.to_peer.clone(),
timeout: persistent_ctx.timeout,
trigger_reason: persistent_ctx.trigger_reason,
})
})
.collect::<Vec<_>>();
let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
region_id: persistent_ctx.region_id,
from_peer: persistent_ctx.from_peer.clone(),
to_peer: persistent_ctx.to_peer.clone(),
timeout: persistent_ctx.timeout,
trigger_reason: persistent_ctx.trigger_reason,
});
let context = context_factory.new_context(persistent_ctx);
Ok(Self {
state,
context,
_guard: guard,
_guards: guards,
})
}
@@ -653,27 +761,25 @@ impl RegionMigrationProcedure {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&["rollback"])
.start_timer();
let table_id = self.context.region_id().table_id();
let region_id = self.context.region_id();
let table_metadata_manager = self.context.table_metadata_manager.clone();
let table_route = self.context.get_table_route_value().await?;
// Safety: It must be a physical table route.
let downgraded = table_route
.region_routes()
.unwrap()
.iter()
.filter(|route| route.region.id == region_id)
.any(|route| route.is_leader_downgrading());
if downgraded {
let table_lock = TableLock::Write(region_id.table_id()).into();
let ctx = &self.context;
let table_regions = ctx.persistent_ctx.table_regions();
for (table_id, regions) in table_regions {
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
info!("Rollbacking downgraded region leader table route, region: {region_id}");
table_metadata_manager
let table_route = ctx.get_table_route_value(table_id).await?;
let region_routes = table_route.region_routes().unwrap();
let downgraded = region_routes
.iter()
.filter(|route| regions.contains(&route.region.id))
.any(|route| route.is_leader_downgrading());
if downgraded {
info!(
"Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
);
let table_metadata_manager = &ctx.table_metadata_manager;
table_metadata_manager
.update_leader_region_status(table_id, &table_route, |route| {
if route.region.id == region_id {
if regions.contains(&route.region.id) {
Some(None)
} else {
None
@@ -683,13 +789,13 @@ impl RegionMigrationProcedure {
.context(error::TableMetadataManagerSnafu)
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
})?;
self.context
.deregister_failure_detectors_for_candidate_region()
.await;
}
}
self.context
.deregister_failure_detectors_for_candidate_region()
.await;
self.context.register_failure_detectors().await;
Ok(())
@@ -732,14 +838,14 @@ impl Procedure for RegionMigrationProcedure {
Err(ProcedureError::retry_later(e))
} else {
// Consumes the opening region guard before deregistering the failure detectors.
self.context.volatile_ctx.opening_region_guard.take();
self.context.volatile_ctx.opening_region_guards.clear();
self.context
.deregister_failure_detectors_for_candidate_region()
.await;
error!(
e;
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.context.region_id(),
"Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
self.context.persistent_ctx.region_ids,
self.context.persistent_ctx.from_peer,
self.context.persistent_ctx.to_peer,
self.context.volatile_ctx.metrics,
@@ -766,7 +872,7 @@ impl Procedure for RegionMigrationProcedure {
}
fn user_metadata(&self) -> Option<UserMetadata> {
Some(UserMetadata::new(self.context.persistent_ctx()))
Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
}
}
@@ -780,7 +886,6 @@ mod tests {
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};
use super::update_metadata::UpdateMetadata;
use super::*;
use crate::handler::HeartbeatMailbox;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
@@ -803,7 +908,7 @@ mod tests {
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
let key = procedure.lock_key();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
@@ -820,10 +925,10 @@ mod tests {
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
let serialized = procedure.dump().unwrap();
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}
@@ -864,7 +969,7 @@ mod tests {
let persistent_context = new_persistent_context();
let context_factory = env.context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
}
let ctx = TestingEnv::procedure_context();
@@ -887,7 +992,9 @@ mod tests {
let mut procedure =
RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
.unwrap();
assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
for region_id in &procedure.context.persistent_ctx.region_ids {
assert!(tracker.contains(*region_id));
}
for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());
@@ -927,9 +1034,34 @@ mod tests {
vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
"Should be the open candidate region",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
Assertion::simple(assert_open_candidate_region, assert_need_persist),
),
// OpenCandidateRegion
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(move |id| {
Ok(new_flush_region_reply_for_region(
id,
RegionId::new(1024, 1),
true,
None,
))
}),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade
Step::next(
@@ -988,7 +1120,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
@@ -1015,61 +1147,6 @@ mod tests {
runner.suite.verify_table_metadata().await;
}
#[tokio::test]
async fn test_procedure_flow_idempotent() {
common_telemetry::init_default_ut_logging();
let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let state = Box::new(RegionMigrationStart);
// The table metadata.
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
..Default::default()
}];
let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
suite.init_table_metadata(table_info, region_routes).await;
let steps = procedure_flow_steps(from_peer_id, to_peer_id);
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
Arc::new(reset_volatile_ctx),
]),
);
let steps = [
steps.clone(),
vec![setup_to_latest_persisted_state.clone()],
steps.clone()[1..].to_vec(),
vec![setup_to_latest_persisted_state],
steps.clone()[1..].to_vec(),
]
.concat();
let timer = Instant::now();
// Run the table tests.
let runner = ProcedureMigrationSuiteRunner::new(suite)
.steps(steps.clone())
.run_once()
.await;
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
runner.suite.verify_table_metadata().await;
}
#[tokio::test]
async fn test_procedure_flow_open_candidate_region_retryable_error() {
common_telemetry::init_default_ut_logging();
@@ -1080,7 +1157,7 @@ mod tests {
// The table metadata.
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
@@ -1168,13 +1245,12 @@ mod tests {
let from_peer_id = persistent_context.from_peer.id;
let to_peer_id = persistent_context.to_peer.id;
let from_peer = persistent_context.from_peer.clone();
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(from_peer),
follower_peers: vec![to_peer],
follower_peers: vec![],
..Default::default()
}];
@@ -1184,9 +1260,34 @@ mod tests {
let steps = vec![
// MigrationStart
Step::next(
"Should be the update metadata for downgrading",
"Should be the open candidate region",
None,
Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
Assertion::simple(assert_open_candidate_region, assert_need_persist),
),
// OpenCandidateRegion
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
to_peer_id,
Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
)),
Assertion::simple(assert_flush_leader_region, assert_no_persist),
),
// Flush Leader Region
Step::next(
"Should be the flush leader region",
Some(mock_datanode_reply(
from_peer_id,
Arc::new(move |id| {
Ok(new_flush_region_reply_for_region(
id,
RegionId::new(1024, 1),
true,
None,
))
}),
)),
Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
),
// UpdateMetadata::Downgrade
Step::next(
@@ -1230,9 +1331,9 @@ mod tests {
];
let setup_to_latest_persisted_state = Step::setup(
"Sets state to UpdateMetadata::Downgrade",
"Sets state to OpenCandidateRegion",
merge_before_test_fn(vec![
setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
Arc::new(reset_volatile_ctx),
]),
);
@@ -1264,7 +1365,7 @@ mod tests {
let to_peer_id = persistent_context.to_peer.id;
let from_peer_id = persistent_context.from_peer.id;
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),

View File

@@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
@@ -47,12 +46,12 @@ impl State for CloseDowngradedRegion {
) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();
warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode);
let region_ids = &ctx.persistent_ctx.region_ids;
warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode);
}
info!(
"Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
ctx.region_id(),
"Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
ctx.persistent_ctx.region_ids,
ctx.persistent_ctx.from_peer,
ctx.persistent_ctx.to_peer,
ctx.persistent_ctx.trigger_reason,
@@ -74,28 +73,30 @@ impl CloseDowngradedRegion {
async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode_id = pc.from_peer.id;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let region_ids = &ctx.persistent_ctx.region_ids;
let mut idents = Vec::with_capacity(region_ids.len());
let RegionInfo { engine, .. } = datanode_table_value.region_info.clone();
for region_id in region_ids {
idents.push(RegionIdent {
datanode_id: downgrade_leader_datanode_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
// The `engine` field is not used for closing region.
engine: String::new(),
});
}
Ok(Instruction::CloseRegions(vec![RegionIdent {
datanode_id: downgrade_leader_datanode_id,
table_id,
region_number,
engine,
}]))
Ok(Instruction::CloseRegions(idents))
}
/// Closes the downgraded leader region.
async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> {
let close_instruction = self.build_close_region_instruction(ctx).await?;
let region_id = ctx.region_id();
let region_ids = &ctx.persistent_ctx.region_ids;
let pc = &ctx.persistent_ctx;
let downgrade_leader_datanode = &pc.from_peer;
let msg = MailboxMessage::json_message(
&format!("Close downgraded region: {}", region_id),
&format!("Close downgraded regions: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!(
"Datanode-{}@{}",
@@ -118,8 +119,8 @@ impl CloseDowngradedRegion {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received close downgraded leade region reply: {:?}, region: {}",
reply, region_id
"Received close downgraded leade region reply: {:?}, region: {:?}",
reply, region_ids
);
let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else {
return error::UnexpectedInstructionReplySnafu {
@@ -134,7 +135,7 @@ impl CloseDowngradedRegion {
} else {
error::UnexpectedSnafu {
violated: format!(
"Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}",
"Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}",
downgrade_leader_datanode,
),
}

View File

@@ -22,7 +22,7 @@ use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -70,30 +70,30 @@ impl State for DowngradeLeaderRegion {
Ok(_) => {
// Do nothing
info!(
"Downgraded region leader success, region: {}",
ctx.persistent_ctx.region_id
"Downgraded region leader success, region: {:?}",
ctx.persistent_ctx.region_ids
);
}
Err(error::Error::ExceededDeadline { .. }) => {
info!(
"Downgrade region leader exceeded deadline, region: {}",
ctx.persistent_ctx.region_id
"Downgrade region leader exceeded deadline, region: {:?}",
ctx.persistent_ctx.region_ids
);
// Rollbacks the metadata if procedure is timeout
return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
}
Err(err) => {
error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
ctx.persistent_ctx.region_id, deadline
"Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
ctx.persistent_ctx.region_ids, deadline
);
tokio::time::sleep_until(*deadline).await;
} else {
warn!(
"Leader region lease deadline is not set, region: {}",
ctx.persistent_ctx.region_id
"Leader region lease deadline is not set, region: {:?}",
ctx.persistent_ctx.region_ids
);
}
}
@@ -118,12 +118,76 @@ impl DowngradeLeaderRegion {
ctx: &Context,
flush_timeout: Duration,
) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::DowngradeRegions(vec![DowngradeRegion {
let region_ids = &ctx.persistent_ctx.region_ids;
let mut downgrade_regions = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
downgrade_regions.push(DowngradeRegion {
region_id: *region_id,
flush_timeout: Some(flush_timeout),
});
}
Instruction::DowngradeRegions(downgrade_regions)
}
fn handle_downgrade_region_reply(
&self,
ctx: &mut Context,
reply: &DowngradeRegionReply,
now: &Instant,
) -> Result<()> {
let leader = &ctx.persistent_ctx.from_peer;
let DowngradeRegionReply {
region_id,
flush_timeout: Some(flush_timeout),
}])
last_entry_id,
metadata_last_entry_id,
exists,
error,
} = reply;
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
.fail();
}
if !exists {
warn!(
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()
);
}
if let Some(last_entry_id) = last_entry_id {
debug!(
"set last_entry_id: {:?}, region_id: {:?}",
last_entry_id, region_id
);
ctx.volatile_ctx
.set_last_entry_id(*region_id, *last_entry_id);
}
if let Some(metadata_last_entry_id) = metadata_last_entry_id {
ctx.volatile_ctx
.set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
}
Ok(())
}
/// Tries to downgrade a leader region.
@@ -140,7 +204,7 @@ impl DowngradeLeaderRegion {
/// - [ExceededDeadline](error::Error::ExceededDeadline)
/// - Invalid JSON.
async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
let region_id = ctx.persistent_ctx.region_id;
let region_ids = &ctx.persistent_ctx.region_ids;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
@@ -150,7 +214,7 @@ impl DowngradeLeaderRegion {
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Downgrade leader region: {}", region_id),
&format!("Downgrade leader regions: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
@@ -168,9 +232,9 @@ impl DowngradeLeaderRegion {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
"Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_id,
region_ids,
now.elapsed()
);
let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
@@ -182,57 +246,14 @@ impl DowngradeLeaderRegion {
.fail();
};
// TODO(weny): handle multiple replies.
let DowngradeRegionReply {
region_id,
last_entry_id,
metadata_last_entry_id,
exists,
error,
} = &replies[0];
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, leader, error, now.elapsed()
),
}
.fail();
for reply in replies {
self.handle_downgrade_region_reply(ctx, &reply, &now)?;
}
if !exists {
warn!(
"Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
region_id,
leader,
now.elapsed()
);
} else {
info!(
"Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
region_id,
leader,
last_entry_id,
metadata_last_entry_id,
now.elapsed()
);
}
if let Some(last_entry_id) = last_entry_id {
ctx.volatile_ctx.set_last_entry_id(*last_entry_id);
}
if let Some(metadata_last_entry_id) = metadata_last_entry_id {
ctx.volatile_ctx
.set_metadata_last_entry_id(*metadata_last_entry_id);
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
"Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
leader,
now.elapsed()
);
@@ -248,7 +269,7 @@ impl DowngradeLeaderRegion {
let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
Err(err) => {
error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
return;
}
};
@@ -266,8 +287,8 @@ impl DowngradeLeaderRegion {
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
);
} else if elapsed > 0 {
// `now - last_connection_at` < REGION_LEASE_SECS * 1000
@@ -277,23 +298,23 @@ impl DowngradeLeaderRegion {
ctx.volatile_ctx
.set_leader_region_lease_deadline(lease_timeout);
info!(
"Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
"Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
leader,
last_connection_at,
elapsed,
ctx.volatile_ctx.leader_region_lease_deadline,
ctx.persistent_ctx.region_id
ctx.persistent_ctx.region_ids
);
} else {
warn!(
"Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
leader, last_connection_at, now, ctx.persistent_ctx.region_id
"Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
leader, last_connection_at, now, ctx.persistent_ctx.region_ids
)
}
} else {
warn!(
"Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
leader, ctx.persistent_ctx.region_id
"Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
leader, ctx.persistent_ctx.region_ids
)
}
}
@@ -318,19 +339,20 @@ impl DowngradeLeaderRegion {
retry += 1;
// Throws the error immediately if the procedure exceeded the deadline.
if matches!(err, error::Error::ExceededDeadline { .. }) {
error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
return Err(err);
} else if matches!(err, error::Error::PusherNotFound { .. }) {
// Throws the error immediately if the datanode is unreachable.
error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
self.update_leader_region_lease_deadline(ctx).await;
return Err(err);
} else if err.is_retryable() && retry < self.optimistic_retry {
error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
sleep(self.retry_initial_interval).await;
} else {
return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
region_id: ctx.persistent_ctx.region_id,
// TODO(weny): handle multiple regions.
region_id: ctx.persistent_ctx.region_ids[0],
})?;
}
} else {
@@ -372,17 +394,17 @@ mod tests {
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
region_ids: vec![RegionId::new(1024, 1)],
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
}
}
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let table_info =
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
let region_id = ctx.persistent_ctx.region_ids[0];
let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(ctx.persistent_ctx.region_id),
region: Region::new_test(region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
..Default::default()
@@ -590,7 +612,13 @@ mod tests {
});
state.downgrade_region_with_retry(&mut ctx).await.unwrap();
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert_eq!(
ctx.volatile_ctx
.leader_region_last_entry_ids
.get(&RegionId::new(0, 0))
.cloned(),
Some(1)
);
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
}
@@ -636,7 +664,7 @@ mod tests {
.await
.unwrap_err();
assert_matches!(err, error::Error::DowngradeLeader { .. });
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
// assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
// Should remain no change.
assert_eq!(
ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
@@ -671,7 +699,13 @@ mod tests {
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert_eq!(
ctx.volatile_ctx
.leader_region_last_entry_ids
.get(&RegionId::new(0, 0))
.cloned(),
Some(1)
);
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
let _ = next

View File

@@ -15,7 +15,7 @@
use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
@@ -64,8 +64,10 @@ impl PreFlushRegion {
/// Builds flush leader region instruction.
fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
Instruction::FlushRegions(FlushRegions::sync_single(region_id))
Instruction::FlushRegions(FlushRegions::sync_batch(
pc.region_ids.clone(),
FlushErrorStrategy::TryAll,
))
}
/// Tries to flush a leader region.
@@ -88,11 +90,11 @@ impl PreFlushRegion {
operation: "Flush leader region",
})?;
let flush_instruction = self.build_flush_leader_region_instruction(ctx);
let region_id = ctx.persistent_ctx.region_id;
let region_ids = &ctx.persistent_ctx.region_ids;
let leader = &ctx.persistent_ctx.from_peer;
let msg = MailboxMessage::json_message(
&format!("Flush leader region: {}", region_id),
&format!("Flush leader region: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", leader.id, leader.addr),
common_time::util::current_time_millis(),
@@ -111,32 +113,42 @@ impl PreFlushRegion {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
"Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_id,
region_ids,
now.elapsed()
);
let reply_result = match reply {
InstructionReply::FlushRegions(flush_reply) => {
if flush_reply.results.len() != 1 {
if flush_reply.results.len() != region_ids.len() {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect single region flush result",
reason: format!(
"expect {} region flush result, but got {}",
region_ids.len(),
flush_reply.results.len()
),
}
.fail();
}
let (reply_region_id, result) = &flush_reply.results[0];
if *reply_region_id != region_id {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "flush reply region ID mismatch",
}
.fail();
}
match result {
Ok(()) => (true, None),
Err(err) => (false, Some(err.clone())),
match flush_reply.overall_success {
true => (true, None),
false => (
false,
Some(
flush_reply
.results
.iter()
.filter_map(|(region_id, result)| match result {
Ok(_) => None,
Err(e) => Some(format!("{}: {}", region_id, e)),
})
.collect::<Vec<String>>()
.join("; "),
),
),
}
}
_ => {
@@ -149,15 +161,15 @@ impl PreFlushRegion {
};
let (result, error) = reply_result;
if error.is_some() {
if let Some(error) = error {
warn!(
"Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
region_id, leader, error
"Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
region_ids, leader, &error
);
} else if result {
info!(
"The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
region_id,
"The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
region_ids,
leader,
now.elapsed()
);
@@ -166,15 +178,15 @@ impl PreFlushRegion {
Ok(())
}
Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
operation: "Flush leader region",
operation: "Flush leader regions",
}
.fail(),
Err(err) => Err(err),
},
Err(Error::PusherNotFound { .. }) => {
warn!(
"Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_id, leader
"Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
region_ids, leader
);
Ok(())
}
@@ -268,7 +280,7 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();
@@ -297,7 +309,7 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let mut env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let mailbox_ctx = env.mailbox_context();

View File

@@ -387,14 +387,14 @@ impl RegionMigrationManager {
PersistentContext {
catalog: catalog_name,
schema: schema_name,
region_id,
region_ids: vec![region_id],
from_peer,
to_peer,
timeout,
trigger_reason,
},
self.context_factory.clone(),
Some(guard),
vec![guard],
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;

View File

@@ -44,9 +44,9 @@ impl State for RegionMigrationAbort {
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
warn!(
"Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
"Region migration is aborted: {}, regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}",
self.reason,
ctx.region_id(),
ctx.persistent_ctx.region_ids,
ctx.persistent_ctx.from_peer,
ctx.persistent_ctx.to_peer,
ctx.persistent_ctx.trigger_reason,

View File

@@ -20,22 +20,18 @@ use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{Context, State};
/// The behaviors:
///
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
///
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
/// - If all regions have been migrated, transitions to [RegionMigrationEnd].
/// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort].
/// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region.
#[derive(Debug, Serialize, Deserialize)]
pub struct RegionMigrationStart;
@@ -44,44 +40,62 @@ pub struct RegionMigrationStart;
impl State for RegionMigrationStart {
/// Yields next [State].
///
/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state.
/// Determines the next [State] for region migration:
///
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
/// - If all regions have been migrated, transitions to [RegionMigrationEnd].
/// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort].
/// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region.
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
let mut region_routes = self.retrieve_region_routes(ctx).await?;
let to_peer = &ctx.persistent_ctx.to_peer;
let from_peer = &ctx.persistent_ctx.from_peer;
let region_ids = &ctx.persistent_ctx.region_ids;
if self.has_migrated(&region_route, to_peer)? {
self.filter_unmigrated_regions(&mut region_routes, to_peer);
// No region to migrate, skip the migration.
if region_routes.is_empty() {
info!(
"Region has been migrated, region: {:?}, to_peer: {:?}",
region_route.region.id, to_peer
"All regions have been migrated, regions: {:?}, to_peer: {:?}",
region_ids, to_peer
);
Ok((Box::new(RegionMigrationEnd), Status::done()))
} else if self.invalid_leader_peer(&region_route, from_peer)? {
info!(
"Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
region_route.region.id, region_route.leader_peer, from_peer,
);
Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {from_peer:?}, expected: {:?}",
region_route.leader_peer.as_ref().unwrap(),
))),
Status::done(),
))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
} else {
Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
return Ok((Box::new(RegionMigrationEnd), Status::done()));
}
// Updates the region ids to the unmigrated regions.
if region_routes.len() != region_ids.len() {
let unmigrated_region_ids = region_routes.iter().map(|route| route.region.id).collect();
info!(
"Some of the regions have been migrated, only migrate the following regions: {:?}, to_peer: {:?}",
unmigrated_region_ids, to_peer
);
ctx.persistent_ctx.region_ids = unmigrated_region_ids;
}
// Checks if any of the region leaders is not the `from_peer`.
for region_route in &region_routes {
if self.invalid_leader_peer(region_route, from_peer)? {
info!(
"Abort region migration, region:{}, unexpected leader peer: {:?}, expected: {:?}",
region_route.region.id, region_route.leader_peer, from_peer,
);
return Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {:?}, expected: {:?}",
region_route.leader_peer.as_ref().unwrap(),
from_peer,
))),
Status::done(),
));
}
}
// If all checks pass, open the candidate region.
Ok((Box::new(OpenCandidateRegion), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
@@ -90,7 +104,7 @@ impl State for RegionMigrationStart {
}
impl RegionMigrationStart {
/// Retrieves region route.
/// Retrieves region routes for multiple regions.
///
/// Abort(non-retry):
/// - TableRoute is not found.
@@ -98,39 +112,32 @@ impl RegionMigrationStart {
///
/// Retry:
/// - Failed to retrieve the metadata of table.
async fn retrieve_region_route(
&self,
ctx: &mut Context,
region_id: RegionId,
) -> Result<RegionRoute> {
let table_id = region_id.table_id();
let table_route = ctx.get_table_route_value().await?;
async fn retrieve_region_routes(&self, ctx: &mut Context) -> Result<Vec<RegionRoute>> {
let region_ids = &ctx.persistent_ctx.region_ids;
let table_route_values = ctx.get_table_route_values().await?;
let mut region_routes = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
let table_id = region_id.table_id();
let region_route = table_route_values
.get(&table_id)
.context(error::TableRouteNotFoundSnafu { table_id })?
.region_routes()
.with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
})?
.iter()
.find(|route| route.region.id == *region_id)
.cloned()
.with_context(|| error::UnexpectedSnafu {
violated: format!(
"RegionRoute({}) is not found in TableRoute({})",
region_id, table_id
),
})?;
region_routes.push(region_route);
}
let region_route = table_route
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
})?
.iter()
.find(|route| route.region.id == region_id)
.cloned()
.context(error::UnexpectedSnafu {
violated: format!(
"RegionRoute({}) is not found in TableRoute({})",
region_id, table_id
),
})?;
Ok(region_route)
}
/// Checks whether the candidate region on region has been opened.
/// Returns true if it's been opened.
fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool {
region_route
.follower_peers
.iter()
.any(|peer| peer.id == to_peer.id)
Ok(region_routes)
}
/// Returns true if the region leader is not the `from_peer`.
@@ -143,7 +150,7 @@ impl RegionMigrationStart {
let is_invalid_leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
.with_context(|| error::UnexpectedSnafu {
violated: format!("Leader peer is not found in TableRoute({})", region_id),
})?
.id
@@ -151,6 +158,12 @@ impl RegionMigrationStart {
Ok(is_invalid_leader_peer)
}
/// Filters out regions that unmigrated.
fn filter_unmigrated_regions(&self, region_routes: &mut Vec<RegionRoute>, to_peer: &Peer) {
region_routes
.retain(|region_route| !self.has_migrated(region_route, to_peer).unwrap_or(false));
}
/// Checks whether the region has been migrated.
/// Returns true if it's.
///
@@ -162,7 +175,7 @@ impl RegionMigrationStart {
let region_migrated = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
.with_context(|| error::UnexpectedSnafu {
violated: format!("Leader peer is not found in TableRoute({})", region_id),
})?
.id
@@ -173,6 +186,7 @@ impl RegionMigrationStart {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::key::test_utils::new_test_table_info;
@@ -183,7 +197,6 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
fn new_persistent_context() -> PersistentContext {
@@ -196,14 +209,8 @@ mod tests {
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let err = state
.retrieve_region_route(&mut ctx, RegionId::new(1024, 1))
.await
.unwrap_err();
let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
}
@@ -216,56 +223,20 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let table_info = new_test_table_info(1024, vec![3]).into();
let region_route = RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(from_peer.clone()),
..Default::default()
};
env.create_physical_table_metadata(table_info, vec![region_route])
.await;
let err = state
.retrieve_region_route(&mut ctx, RegionId::new(1024, 3))
.await
.unwrap_err();
let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_next_update_metadata_downgrade_state() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(from_peer_id)),
follower_peers: vec![to_peer],
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);
}
#[tokio::test]
async fn test_next_migration_end_state() {
let mut state = Box::new(RegionMigrationStart);
@@ -274,7 +245,7 @@ mod tests {
let persistent_context = new_persistent_context();
let to_peer = persistent_context.to_peer.clone();
let from_peer = persistent_context.from_peer.clone();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
@@ -302,7 +273,7 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
@@ -327,12 +298,12 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
let region_routes: Vec<RegionRoute> = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1024)),
..Default::default()

View File

@@ -66,33 +66,43 @@ impl OpenCandidateRegion {
/// Abort(non-retry):
/// - Datanode Table is not found.
async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let table_id = pc.region_id.table_id();
let region_number = pc.region_id.region_number();
let candidate_id = pc.to_peer.id;
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let region_ids = ctx.persistent_ctx.region_ids.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let to_peer_id = ctx.persistent_ctx.to_peer.id;
let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
let mut open_regions = Vec::with_capacity(region_ids.len());
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
engine,
} = datanode_table_value.region_info.clone();
let open_instruction = Instruction::OpenRegions(vec![OpenRegion::new(
RegionIdent {
datanode_id: candidate_id,
table_id,
region_number,
for region_id in region_ids {
let table_id = region_id.table_id();
let region_number = region_id.region_number();
let datanode_table_value = datanode_table_values.get(&table_id).context(
error::DatanodeTableNotFoundSnafu {
table_id,
datanode_id: from_peer_id,
},
)?;
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
engine,
},
&region_storage_path,
region_options,
region_wal_options,
true,
)]);
} = datanode_table_value.region_info.clone();
Ok(open_instruction)
open_regions.push(OpenRegion::new(
RegionIdent {
datanode_id: to_peer_id,
table_id,
region_number,
engine,
},
&region_storage_path,
region_options,
region_wal_options,
true,
));
}
Ok(Instruction::OpenRegions(open_regions))
}
/// Opens the candidate region.
@@ -112,25 +122,27 @@ impl OpenCandidateRegion {
) -> Result<()> {
let pc = &ctx.persistent_ctx;
let vc = &mut ctx.volatile_ctx;
let region_id = pc.region_id;
let region_ids = &pc.region_ids;
let candidate = &pc.to_peer;
// This method might be invoked multiple times.
// Only registers the guard if `opening_region_guard` is absent.
if vc.opening_region_guard.is_none() {
// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, region_id)
.context(error::RegionOpeningRaceSnafu {
peer_id: candidate.id,
region_id,
})?;
vc.opening_region_guard = Some(guard);
if vc.opening_region_guards.is_empty() {
for region_id in region_ids {
// Registers the opening region.
let guard = ctx
.opening_region_keeper
.register(candidate.id, *region_id)
.context(error::RegionOpeningRaceSnafu {
peer_id: candidate.id,
region_id: *region_id,
})?;
vc.opening_region_guards.push(guard);
}
}
let msg = MailboxMessage::json_message(
&format!("Open candidate region: {}", region_id),
&format!("Open candidate regions: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
common_time::util::current_time_millis(),
@@ -154,9 +166,9 @@ impl OpenCandidateRegion {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received open region reply: {:?}, region: {}, elapsed: {:?}",
"Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
reply,
region_id,
region_ids,
now.elapsed()
);
let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
@@ -172,7 +184,7 @@ impl OpenCandidateRegion {
} else {
error::RetryLaterSnafu {
reason: format!(
"Region {region_id} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
"Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
candidate,
now.elapsed()
),
@@ -182,7 +194,7 @@ impl OpenCandidateRegion {
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for open candidate region {region_id} on datanode {:?}, elapsed: {:?}",
"Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
candidate,
now.elapsed()
);
@@ -255,7 +267,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
@@ -276,7 +288,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let env = TestingEnv::new();
@@ -302,7 +314,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
@@ -335,7 +347,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
@@ -370,7 +382,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
@@ -410,14 +422,14 @@ mod tests {
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let region_id = persistent_context.region_ids[0];
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
// Prepares table
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(persistent_context.region_id),
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(from_peer_id)),
..Default::default()
}];
@@ -445,10 +457,7 @@ mod tests {
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),
(to_peer_id, region_id)
);
assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
assert_matches!(flush_leader_region, PreFlushRegion);

View File

@@ -190,7 +190,7 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis
schema: "public".into(),
from_peer: Peer::empty(from),
to_peer: Peer::empty(to),
region_id,
region_ids: vec![region_id],
timeout: Duration::from_secs(10),
trigger_reason: RegionMigrationTriggerReason::default(),
}
@@ -306,37 +306,38 @@ impl ProcedureMigrationTestSuite {
/// Verifies table metadata after region migration.
pub(crate) async fn verify_table_metadata(&self) {
let region_id = self.context.persistent_ctx.region_id;
let table_route = self
.env
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(region_id.table_id())
.await
.unwrap()
.unwrap();
let region_routes = table_route.region_routes().unwrap();
for region_id in &self.context.persistent_ctx.region_ids {
let table_route = self
.env
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(region_id.table_id())
.await
.unwrap()
.unwrap();
let region_routes = table_route.region_routes().unwrap();
let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;
let expected_leader_id = self.context.persistent_ctx.to_peer.id;
let removed_follower_id = self.context.persistent_ctx.from_peer.id;
let region_route = region_routes
.iter()
.find(|route| route.region.id == region_id)
.unwrap();
assert!(!region_route.is_leader_downgrading());
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert!(
!region_route
.follower_peers
let region_route = region_routes
.iter()
.any(|route| route.id == removed_follower_id)
)
.find(|route| route.region.id == *region_id)
.unwrap();
assert!(!region_route.is_leader_downgrading());
assert_eq!(
region_route.leader_peer.as_ref().unwrap().id,
expected_leader_id
);
assert!(
!region_route
.follower_peers
.iter()
.any(|route| route.id == removed_follower_id)
)
}
}
}

View File

@@ -18,7 +18,6 @@ pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use common_meta::lock_key::TableLock;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -48,12 +47,10 @@ impl State for UpdateMetadata {
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = TableLock::Write(ctx.region_id().table_id()).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_id).await;
match self {
UpdateMetadata::Downgrade => {
self.downgrade_leader_region(ctx).await?;
self.downgrade_leader_region(ctx, &procedure_ctx.provider)
.await?;
Ok((
Box::<DowngradeLeaderRegion>::default(),
@@ -61,7 +58,8 @@ impl State for UpdateMetadata {
))
}
UpdateMetadata::Upgrade => {
self.upgrade_candidate_region(ctx).await?;
self.upgrade_candidate_region(ctx, &procedure_ctx.provider)
.await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(
@@ -71,7 +69,8 @@ impl State for UpdateMetadata {
Ok((Box::new(CloseDowngradedRegion), Status::executing(false)))
}
UpdateMetadata::Rollback => {
self.rollback_downgraded_region(ctx).await?;
self.rollback_downgraded_region(ctx, &procedure_ctx.provider)
.await?;
if let Err(err) = ctx.invalidate_table_cache().await {
warn!(

View File

@@ -13,7 +13,10 @@
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::lock_key::TableLock;
use common_meta::rpc::router::LeaderState;
use common_procedure::ContextProviderRef;
use common_telemetry::{error, info};
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -37,35 +40,46 @@ impl UpdateMetadata {
/// It will only update **other region** info. Therefore, It's safe to retry after failure.
///
/// - There is no other DDL procedure executed concurrently for the current table.
pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> {
pub async fn downgrade_leader_region(
&self,
ctx: &mut Context,
ctx_provider: &ContextProviderRef,
) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let region_id = ctx.region_id();
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;
let table_regions = ctx.persistent_ctx.table_regions();
// TODO(weny): ensures the leader region peer is the `from_peer`.
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if route.region.id == region_id
&& route
.leader_peer
.as_ref()
.is_some_and(|leader_peer| leader_peer.id == from_peer_id)
{
Some(Some(LeaderState::Downgrading))
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
{
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}"
),
});
for (table_id, regions) in table_regions {
let table_lock = TableLock::Write(table_id).into();
let _guard = ctx_provider.acquire_lock(&table_lock).await;
let current_table_route_value = ctx.get_table_route_value(table_id).await?;
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if regions.contains(&route.region.id)
&& route
.leader_peer
.as_ref()
.is_some_and(|leader_peer| leader_peer.id == from_peer_id)
{
Some(Some(LeaderState::Downgrading))
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
{
error!(err; "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}");
return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}"
),
});
}
info!(
"Downgrading leader region table route success, table_id: {table_id}, regions: {regions:?}, from_peer_id: {from_peer_id}"
);
}
Ok(())
@@ -75,10 +89,13 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use crate::error::Error;
@@ -104,8 +121,12 @@ mod tests {
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
let err = state
.downgrade_leader_region(&mut ctx, &provider)
.await
.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
@@ -119,7 +140,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![RegionRoute {
@@ -162,7 +183,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024, vec![1, 2]).into();
let region_routes = vec![RegionRoute {

View File

@@ -13,6 +13,9 @@
// limitations under the License.
use common_error::ext::BoxedError;
use common_meta::lock_key::TableLock;
use common_procedure::ContextProviderRef;
use common_telemetry::{error, info};
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -28,28 +31,39 @@ impl UpdateMetadata {
/// Retry:
/// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
/// - Failed to retrieve the metadata of table.
pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> {
pub async fn rollback_downgraded_region(
&self,
ctx: &mut Context,
ctx_provider: &ContextProviderRef,
) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let region_id = ctx.region_id();
let table_id = region_id.table_id();
let current_table_route_value = ctx.get_table_route_value().await?;
let table_regions = ctx.persistent_ctx.table_regions();
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if route.region.id == region_id {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
{
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
});
for (table_id, regions) in table_regions {
let table_lock = TableLock::Write(table_id).into();
let _guard = ctx_provider.acquire_lock(&table_lock).await;
let current_table_route_value = ctx.get_table_route_value(table_id).await?;
if let Err(err) = table_metadata_manager
.update_leader_region_status(table_id, &current_table_route_value, |route| {
if regions.contains(&route.region.id) {
Some(None)
} else {
None
}
})
.await
.context(error::TableMetadataManagerSnafu)
{
error!(err; "Failed to update the table route during the rollback downgraded leader regions: {regions:?}");
return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the rollback downgraded leader regions: {regions:?}"),
});
}
info!(
"Rolling back downgraded leader region table route success, table_id: {table_id}, regions: {regions:?}"
);
}
ctx.register_failure_detectors().await;
Ok(())
@@ -59,10 +73,13 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use crate::error::Error;
@@ -82,7 +99,11 @@ mod tests {
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err();
let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _;
let err = state
.rollback_downgraded_region(&mut ctx, &provider)
.await
.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
@@ -97,7 +118,7 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let table_id = ctx.region_id().table_id();
let table_id = ctx.persistent_ctx.region_ids[0].table_id();
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
let region_routes = vec![

View File

@@ -14,9 +14,12 @@
use common_error::ext::BoxedError;
use common_meta::key::datanode_table::RegionInfo;
use common_meta::lock_key::TableLock;
use common_meta::rpc::router::{RegionRoute, region_distribution};
use common_telemetry::{info, warn};
use common_procedure::ContextProviderRef;
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::procedure::region_migration::Context;
@@ -24,104 +27,114 @@ use crate::procedure::region_migration::update_metadata::UpdateMetadata;
impl UpdateMetadata {
/// Returns new [Vec<RegionRoute>].
async fn build_upgrade_candidate_region_metadata(
fn build_upgrade_candidate_region_metadata(
&self,
ctx: &mut Context,
region_ids: &[RegionId],
mut region_routes: Vec<RegionRoute>,
) -> Result<Vec<RegionRoute>> {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let old_leader_peer = &ctx.persistent_ctx.from_peer;
let new_leader_peer = &ctx.persistent_ctx.to_peer;
for region_id in region_ids {
// Find the RegionRoute for this region_id.
let region_route = region_routes
.iter_mut()
.find(|route| route.region.id == *region_id)
.context(error::RegionRouteNotFoundSnafu {
region_id: *region_id,
})?;
let mut region_routes = table_route_value
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
})?
.clone();
let region_route = region_routes
.iter_mut()
.find(|route| route.region.id == region_id)
.context(error::RegionRouteNotFoundSnafu { region_id })?;
// Remove any "downgraded leader" state.
region_route.set_leader_state(None);
// Removes downgraded status.
region_route.set_leader_state(None);
let candidate = &ctx.persistent_ctx.to_peer;
let expected_old_leader = &ctx.persistent_ctx.from_peer;
// Upgrades candidate to leader.
ensure!(
region_route
.leader_peer
.take_if(|old_leader| old_leader.id == expected_old_leader.id)
.is_some(),
error::UnexpectedSnafu {
violated: format!(
"Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}",
region_route.leader_peer, expected_old_leader
),
}
);
region_route.leader_peer = Some(candidate.clone());
info!(
"Upgrading candidate region to leader region: {:?} for region: {}",
candidate, region_id
);
// Removes the candidate region in followers.
let removed = region_route
.follower_peers
.extract_if(.., |peer| peer.id == candidate.id)
.collect::<Vec<_>>();
if removed.len() > 1 {
warn!(
"Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}"
);
}
Ok(region_routes)
}
/// Returns true if region metadata has been updated.
async fn check_metadata_updated(&self, ctx: &mut Context) -> Result<bool> {
let region_id = ctx.region_id();
let table_route_value = ctx.get_table_route_value().await?.clone();
let region_routes = table_route_value
.region_routes()
.context(error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{self:?} is a non-physical TableRouteValue."),
})?
.clone();
let region_route = region_routes
.into_iter()
.find(|route| route.region.id == region_id)
.context(error::RegionRouteNotFoundSnafu { region_id })?;
let leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"),
})?;
let candidate_peer_id = ctx.persistent_ctx.to_peer.id;
if leader_peer.id == candidate_peer_id {
// Check old leader matches expectation before upgrading to new leader.
ensure!(
!region_route.is_leader_downgrading(),
region_route
.leader_peer
.take_if(|old_leader| old_leader.id == old_leader_peer.id)
.is_some(),
error::UnexpectedSnafu {
violated: format!(
"Unexpected intermediate state is found during the update metadata for upgrading region {region_id}"
"Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}",
region_route.leader_peer, old_leader_peer
),
}
);
Ok(true)
} else {
Ok(false)
// Set new leader.
region_route.leader_peer = Some(new_leader_peer.clone());
// Remove new leader from followers (avoids duplicate leader/follower).
let removed = region_route
.follower_peers
.extract_if(.., |peer| peer.id == new_leader_peer.id)
.collect::<Vec<_>>();
// Warn if more than one follower with the new leader id was present.
if removed.len() > 1 {
warn!(
"Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}"
);
}
}
info!(
"Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}",
new_leader_peer, region_ids,
);
Ok(region_routes)
}
/// Checks if metadata has been upgraded for a list of regions by verifying if their
/// leader peers have been switched to a specified peer ID (`to_peer_id`) and that
/// no region is in a leader downgrading state.
///
/// Returns:
/// - `Ok(true)` if all regions' leader is the target peer and no downgrading occurs.
/// - `Ok(false)` if any region's leader is not the target peer.
/// - Error if region route or leader peer cannot be found, or an unexpected state is detected.
fn check_metadata_updated(
&self,
ctx: &mut Context,
region_ids: &[RegionId],
region_routes: &[RegionRoute],
) -> Result<bool> {
// Iterate through each provided region ID
for region_id in region_ids {
// Find the route info for this region
let region_route = region_routes
.iter()
.find(|route| route.region.id == *region_id)
.context(error::RegionRouteNotFoundSnafu {
region_id: *region_id,
})?;
// Get the leader peer for the region, error if not found
let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu {
violated: format!(
"The leader peer of region {region_id} is not found during the metadata upgrade check"
),
})?;
// If the leader is not the expected peer, return false (i.e., not yet upgraded)
if leader_peer.id != ctx.persistent_ctx.to_peer.id {
return Ok(false);
} else {
// If leader matches but region is in leader downgrading state, error (unexpected state)
ensure!(
!region_route.is_leader_downgrading(),
error::UnexpectedSnafu {
violated: format!(
"Unexpected intermediate state is found during the metadata upgrade check for region {region_id}"
),
}
);
}
}
// All regions' leader match expected peer and are not downgrading; considered upgraded
Ok(true)
}
/// Upgrades the candidate region.
@@ -133,55 +146,77 @@ impl UpdateMetadata {
/// Retry:
/// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue).
/// - Failed to retrieve the metadata of table.
pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> {
let region_id = ctx.region_id();
pub async fn upgrade_candidate_region(
&self,
ctx: &mut Context,
ctx_provider: &ContextProviderRef,
) -> Result<()> {
let table_metadata_manager = ctx.table_metadata_manager.clone();
let table_regions = ctx.persistent_ctx.table_regions();
let from_peer_id = ctx.persistent_ctx.from_peer.id;
let to_peer_id = ctx.persistent_ctx.to_peer.id;
if self.check_metadata_updated(ctx).await? {
return Ok(());
for (table_id, region_ids) in table_regions {
let table_lock = TableLock::Write(table_id).into();
let _guard = ctx_provider.acquire_lock(&table_lock).await;
let table_route_value = ctx.get_table_route_value(table_id).await?;
let region_routes = table_route_value.region_routes().with_context(|_| {
error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
}
})?;
if self.check_metadata_updated(ctx, &region_ids, region_routes)? {
continue;
}
let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?;
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
engine,
} = datanode_table_value.region_info.clone();
let new_region_routes = self.build_upgrade_candidate_region_metadata(
ctx,
&region_ids,
region_routes.clone(),
)?;
let region_distribution = region_distribution(region_routes);
info!(
"Trying to update region routes to {:?} for table: {}",
region_distribution, table_id,
);
if let Err(err) = table_metadata_manager
.update_table_route(
table_id,
RegionInfo {
engine: engine.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
region_wal_options: region_wal_options.clone(),
},
&table_route_value,
new_region_routes,
&region_options,
&region_wal_options,
)
.await
.context(error::TableMetadataManagerSnafu)
{
error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}");
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"),
});
};
info!(
"Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}"
);
}
let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?;
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
let RegionInfo {
region_storage_path,
region_options,
region_wal_options,
engine,
} = datanode_table_value.region_info.clone();
let table_route_value = ctx.get_table_route_value().await?;
let region_distribution = region_distribution(&region_routes);
info!(
"Trying to update region routes to {:?} for table: {}",
region_distribution,
region_id.table_id()
);
if let Err(err) = table_metadata_manager
.update_table_route(
region_id.table_id(),
RegionInfo {
engine: engine.clone(),
region_storage_path: region_storage_path.clone(),
region_options: region_options.clone(),
region_wal_options: region_wal_options.clone(),
},
&table_route_value,
region_routes,
&region_options,
&region_wal_options,
)
.await
.context(error::TableMetadataManagerSnafu)
{
return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"),
});
};
ctx.deregister_failure_detectors().await;
// Consumes the guard.
ctx.volatile_ctx.opening_region_guard.take();
ctx.volatile_ctx.opening_region_guards.clear();
Ok(())
}
@@ -210,16 +245,11 @@ mod tests {
#[tokio::test]
async fn test_table_route_is_not_found_error() {
let state = UpdateMetadata::Upgrade;
let env = TestingEnv::new();
let persistent_context = new_persistent_context();
let mut ctx = env.context_factory().new_context(persistent_context);
let ctx = env.context_factory().new_context(persistent_context);
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
.await
.unwrap_err();
let err = ctx.get_table_route_value(1024).await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
@@ -238,13 +268,20 @@ mod tests {
leader_peer: Some(Peer::empty(4)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_route_value
.into_inner()
.into_physical_table_route()
.region_routes;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
.await
.build_upgrade_candidate_region_metadata(
&mut ctx,
&[RegionId::new(1024, 1)],
region_routes,
)
.unwrap_err();
assert_matches!(err, Error::RegionRouteNotFound { .. });
@@ -268,9 +305,17 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_route_value
.into_inner()
.into_physical_table_route()
.region_routes;
let err = state
.build_upgrade_candidate_region_metadata(&mut ctx)
.await
.build_upgrade_candidate_region_metadata(
&mut ctx,
&[RegionId::new(1024, 1)],
region_routes,
)
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
@@ -297,9 +342,17 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let table_route_value = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_route_value
.into_inner()
.into_physical_table_route()
.region_routes;
let new_region_routes = state
.build_upgrade_candidate_region_metadata(&mut ctx)
.await
.build_upgrade_candidate_region_metadata(
&mut ctx,
&[RegionId::new(1024, 1)],
region_routes,
)
.unwrap();
assert!(!new_region_routes[0].is_leader_downgrading());
@@ -327,8 +380,11 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
let table_routes = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_routes.region_routes().unwrap();
let updated = state
.check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
.unwrap();
assert!(!updated);
}
@@ -352,7 +408,11 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let updated = state.check_metadata_updated(&mut ctx).await.unwrap();
let table_routes = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_routes.region_routes().unwrap();
let updated = state
.check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
.unwrap();
assert!(updated);
}
@@ -376,7 +436,11 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let err = state.check_metadata_updated(&mut ctx).await.unwrap_err();
let table_routes = ctx.get_table_route_value(1024).await.unwrap();
let region_routes = table_routes.region_routes().unwrap();
let err = state
.check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes)
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(err.to_string().contains("intermediate state"));
}
@@ -401,7 +465,7 @@ mod tests {
let guard = opening_keeper
.register(2, RegionId::new(table_id, 1))
.unwrap();
ctx.volatile_ctx.opening_region_guard = Some(guard);
ctx.volatile_ctx.opening_region_guards.push(guard);
env.create_physical_table_metadata(table_info, region_routes)
.await;
@@ -425,7 +489,7 @@ mod tests {
.unwrap();
let region_routes = table_route.region_routes().unwrap();
assert!(ctx.volatile_ctx.opening_region_guard.is_none());
assert!(ctx.volatile_ctx.opening_region_guards.is_empty());
assert_eq!(region_routes.len(), 1);
assert!(!region_routes[0].is_leader_downgrading());
assert!(region_routes[0].follower_peers.is_empty());

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashSet;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
@@ -20,10 +21,11 @@ use common_meta::ddl::utils::parse_region_wal_options;
use common_meta::instruction::{
Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::topic_region::TopicRegionKey;
use common_meta::lock_key::RemoteWalLock;
use common_meta::wal_options_allocator::extract_topic_from_wal_options;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, warn};
use common_telemetry::{error, info};
use common_wal::options::WalOptions;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
@@ -66,17 +68,9 @@ impl State for UpgradeCandidateRegion {
) -> Result<(Box<dyn State>, Status)> {
let now = Instant::now();
let region_wal_option = self.get_region_wal_option(ctx).await?;
let region_id = ctx.persistent_ctx.region_id;
if region_wal_option.is_none() {
warn!(
"Region {} wal options not found, during upgrade candidate region",
region_id
);
}
let topics = self.get_kafka_topics(ctx).await?;
if self
.upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
.upgrade_region_with_retry(ctx, procedure_ctx, topics)
.await
{
ctx.update_upgrade_candidate_region_elapsed(now);
@@ -93,24 +87,32 @@ impl State for UpgradeCandidateRegion {
}
impl UpgradeCandidateRegion {
async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
let region_id = ctx.persistent_ctx.region_id;
match ctx.get_from_peer_datanode_table_value().await {
Ok(datanode_table_value) => {
let region_wal_options =
parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
.context(error::ParseWalOptionsSnafu)?;
Ok(region_wal_options.get(&region_id.region_number()).cloned())
async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
let table_regions = ctx.persistent_ctx.table_regions();
let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
let mut topics = HashSet::new();
for (table_id, regions) in table_regions {
let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
continue;
};
let region_wal_options =
parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
.context(error::ParseWalOptionsSnafu)?;
for region_id in regions {
let Some(WalOptions::Kafka(kafka_wal_options)) =
region_wal_options.get(&region_id.region_number())
else {
continue;
};
if !topics.contains(&kafka_wal_options.topic) {
topics.insert(kafka_wal_options.topic.clone());
}
}
Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
warn!(
"Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
region_id, datanode_id
);
Ok(None)
}
Err(e) => Err(e),
}
Ok(topics)
}
/// Builds upgrade region instruction.
@@ -119,35 +121,105 @@ impl UpgradeCandidateRegion {
ctx: &mut Context,
replay_timeout: Duration,
) -> Result<Instruction> {
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
// Try our best to retrieve replay checkpoint.
let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
}) {
ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
} else {
None
};
let region_ids = ctx.persistent_ctx.region_ids.clone();
let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
let mut region_topic = Vec::with_capacity(region_ids.len());
for region_id in region_ids.iter() {
let table_id = region_id.table_id();
if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
&& let Some(topic) = extract_topic_from_wal_options(
*region_id,
&datanode_table_value.region_info.region_wal_options,
)
{
region_topic.push((*region_id, topic));
}
}
let upgrade_instruction = Instruction::UpgradeRegions(vec![
UpgradeRegion {
let replay_checkpoints = ctx
.get_replay_checkpoints(
region_topic
.iter()
.map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
.collect(),
)
.await?;
// Build upgrade regions instruction.
let mut upgrade_regions = Vec::with_capacity(region_ids.len());
for region_id in region_ids {
let last_entry_id = ctx
.volatile_ctx
.leader_region_last_entry_ids
.get(&region_id)
.copied();
let metadata_last_entry_id = ctx
.volatile_ctx
.leader_region_metadata_last_entry_ids
.get(&region_id)
.copied();
let checkpoint = replay_checkpoints.get(&region_id).copied();
upgrade_regions.push(UpgradeRegion {
region_id,
last_entry_id,
metadata_last_entry_id,
replay_timeout,
location_id: Some(ctx.persistent_ctx.from_peer.id),
replay_entry_id: None,
metadata_replay_entry_id: None,
}
.with_replay_entry_id(checkpoint.map(|c| c.entry_id))
.with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
]);
replay_entry_id: checkpoint.map(|c| c.entry_id),
metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
});
}
Ok(upgrade_instruction)
Ok(Instruction::UpgradeRegions(upgrade_regions))
}
fn handle_upgrade_region_reply(
&self,
ctx: &mut Context,
UpgradeRegionReply {
region_id,
ready,
exists,
error,
}: &UpgradeRegionReply,
now: &Instant,
) -> Result<()> {
let candidate = &ctx.persistent_ctx.to_peer;
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id,
candidate,
error,
now.elapsed()
),
}
.fail();
}
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Candidate region {} doesn't exist on datanode {:?}",
region_id, candidate
)
}
);
if self.require_ready && !ready {
return error::RetryLaterSnafu {
reason: format!(
"Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
region_id,
candidate,
now.elapsed()
),
}
.fail();
}
Ok(())
}
/// Tries to upgrade a candidate region.
@@ -175,11 +247,11 @@ impl UpgradeCandidateRegion {
.await?;
let pc = &ctx.persistent_ctx;
let region_id = pc.region_id;
let region_ids = &pc.region_ids;
let candidate = &pc.to_peer;
let msg = MailboxMessage::json_message(
&format!("Upgrade candidate region: {}", region_id),
&format!("Upgrade candidate regions: {:?}", region_ids),
&format!("Metasrv@{}", ctx.server_addr()),
&format!("Datanode-{}@{}", candidate.id, candidate.addr),
common_time::util::current_time_millis(),
@@ -192,9 +264,16 @@ impl UpgradeCandidateRegion {
let ch = Channel::Datanode(candidate.id);
let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
let now = Instant::now();
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
reply,
region_ids,
now.elapsed()
);
let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
else {
return error::UnexpectedInstructionReplySnafu {
@@ -203,51 +282,16 @@ impl UpgradeCandidateRegion {
}
.fail();
};
// TODO(weny): handle multiple replies.
let UpgradeRegionReply {
ready,
exists,
error,
..
} = &replies[0];
// Notes: The order of handling is important.
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to upgrade the region {} on datanode {:?}, error: {:?}",
region_id, candidate, error
),
}
.fail();
for reply in replies {
self.handle_upgrade_region_reply(ctx, &reply, &now)?;
}
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Candidate region {} doesn't exist on datanode {:?}",
region_id, candidate
)
}
);
if self.require_ready && !ready {
return error::RetryLaterSnafu {
reason: format!(
"Candidate region {} still replaying the wal on datanode {:?}",
region_id, candidate
),
}
.fail();
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
"Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
candidate,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
@@ -262,26 +306,24 @@ impl UpgradeCandidateRegion {
&self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
wal_options: Option<&WalOptions>,
topics: HashSet<String>,
) -> bool {
let mut retry = 0;
let mut upgraded = false;
let mut guards = Vec::with_capacity(topics.len());
loop {
let timer = Instant::now();
// If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
Some(
for topic in &topics {
guards.push(
procedure_ctx
.provider
.acquire_lock(
&(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
)
.acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
.await,
)
} else {
None
};
);
}
if let Err(err) = self.upgrade_region(ctx).await {
retry += 1;
ctx.update_operations_elapsed(timer);
@@ -332,17 +374,17 @@ mod tests {
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_id: RegionId::new(1024, 1),
region_ids: vec![RegionId::new(1024, 1)],
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
}
}
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
let table_info =
new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
let region_id = ctx.persistent_ctx.region_ids[0];
let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(ctx.persistent_ctx.region_id),
region: Region::new_test(region_id),
leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
..Default::default()