fix: improve region migration error handling and optimize leader downgrade with lease check (#6026)

* fix(meta): improve region migration error handling and lease management

* chore: refine comments

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* feat: consume opening_region_guard
This commit is contained in:
Weny Xu
2025-05-07 08:54:35 +08:00
committed by GitHub
parent 07e84a28a3
commit df31f0b9ec
11 changed files with 284 additions and 42 deletions

View File

@@ -336,6 +336,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to downgrade region leader, region: {}", region_id))]
DowngradeLeader {
region_id: RegionId,
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: BoxedError,
},
#[snafu(display("Region's leader peer changed: {}", msg))]
LeaderPeerChanged {
msg: String,
@@ -956,7 +965,7 @@ impl ErrorExt for Error {
Error::StartTelemetryTask { source, .. } => source.status_code(),
Error::NextSequence { source, .. } => source.status_code(),
Error::DowngradeLeader { source, .. } => source.status_code(),
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }

View File

@@ -113,3 +113,88 @@ async fn put_into_memory_store(ctx: &mut Context, key: Vec<u8>, value: Vec<u8>,
warn!(err; "Failed to update lease KV, peer: {peer:?}");
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::RequestHeader;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode::Stat;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use super::*;
use crate::cluster::MetaPeerClientBuilder;
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::lease::find_datanode_lease_value;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
#[tokio::test]
async fn test_put_into_memory_store() {
let in_memory = Arc::new(MemoryKvBackend::new());
let kv_backend = Arc::new(MemoryKvBackend::new());
let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::with_always_leader(
kv_backend.clone(),
));
let seq = SequenceBuilder::new("test_seq", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(Pushers::default(), seq);
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(in_memory.clone())
.build()
.map(Arc::new)
// Safety: all required fields set at initialization
.unwrap();
let ctx = Context {
server_addr: "127.0.0.1:0000".to_string(),
in_memory,
kv_backend: kv_backend.clone(),
leader_cached_kv_backend,
meta_peer_client,
mailbox,
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
leader_region_registry: Arc::new(LeaderRegionRegistry::new()),
};
let handler = DatanodeKeepLeaseHandler;
handle_request_many_times(ctx.clone(), &handler, 1).await;
let lease_value = find_datanode_lease_value(1, &ctx.in_memory)
.await
.unwrap()
.unwrap();
assert_eq!(lease_value.node_addr, "127.0.0.1:1");
assert!(lease_value.timestamp_millis != 0);
}
async fn handle_request_many_times(
mut ctx: Context,
handler: &DatanodeKeepLeaseHandler,
loop_times: i32,
) {
let req = HeartbeatRequest {
header: Some(RequestHeader::new(1, Role::Datanode, HashMap::new())),
peer: Some(Peer::new(1, "127.0.0.1:1")),
..Default::default()
};
for i in 1..=loop_times {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
id: 101,
region_num: i as _,
..Default::default()
}),
..Default::default()
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
}
}

View File

@@ -16,14 +16,14 @@ use std::collections::HashMap;
use std::hash::Hash;
use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackend;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::{util, DatanodeId, FlownodeId};
use common_time::util as time_util;
use snafu::ResultExt;
use crate::cluster::MetaPeerClientRef;
use crate::error::{Error, Result};
use crate::error::{Error, KvBackendSnafu, Result};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
@@ -33,6 +33,28 @@ fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool {
}
}
/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
pub async fn find_datanode_lease_value(
datanode_id: DatanodeId,
in_memory_key: &ResettableKvBackendRef,
) -> Result<Option<LeaseValue>> {
let lease_key = DatanodeLeaseKey {
node_id: datanode_id,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
let Some(kv) = in_memory_key
.get(&lease_key_bytes)
.await
.context(KvBackendSnafu)?
else {
return Ok(None);
};
let lease_value: LeaseValue = kv.value.try_into()?;
Ok(Some(lease_value))
}
/// look up [`Peer`] given [`ClusterId`] and [`DatanodeId`], will only return if it's alive under given `lease_secs`
pub async fn lookup_datanode_peer(
datanode_id: DatanodeId,

View File

@@ -311,6 +311,7 @@ impl MetasrvBuilder {
let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
in_memory.clone(),
table_metadata_manager.clone(),
memory_region_keeper.clone(),
region_failure_detector_controller.clone(),

View File

@@ -37,6 +37,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
@@ -266,6 +267,7 @@ pub trait ContextFactory {
#[derive(Clone)]
pub struct DefaultContextFactory {
volatile_ctx: VolatileContext,
in_memory_key: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -277,6 +279,7 @@ pub struct DefaultContextFactory {
impl DefaultContextFactory {
/// Returns an [`DefaultContextFactory`].
pub fn new(
in_memory_key: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -286,6 +289,7 @@ impl DefaultContextFactory {
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
in_memory_key,
table_metadata_manager,
opening_region_keeper,
region_failure_detector_controller,
@@ -301,6 +305,7 @@ impl ContextFactory for DefaultContextFactory {
Context {
persistent_ctx,
volatile_ctx: self.volatile_ctx,
in_memory: self.in_memory_key,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
region_failure_detector_controller: self.region_failure_detector_controller,
@@ -315,6 +320,7 @@ impl ContextFactory for DefaultContextFactory {
pub struct Context {
persistent_ctx: PersistentContext,
volatile_ctx: VolatileContext,
in_memory: ResettableKvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
@@ -417,8 +423,8 @@ impl Context {
/// Notifies the RegionSupervisor to deregister failure detectors.
///
/// The original failure detectors was removed once the procedure was triggered.
/// However, the `from_peer` may still send the heartbeats contains the failed region.
/// The original failure detectors won't be removed once the procedure was triggered.
/// We need to deregister the failure detectors for the original region if the procedure is finished.
pub async fn deregister_failure_detectors(&self) {
let datanode_id = self.persistent_ctx.from_peer.id;
let region_id = self.persistent_ctx.region_id;
@@ -428,6 +434,19 @@ impl Context {
.await;
}
/// Notifies the RegionSupervisor to deregister failure detectors for the candidate region on the destination peer.
///
/// The candidate region may be created on the destination peer,
/// so we need to deregister the failure detectors for the candidate region if the procedure is aborted.
pub async fn deregister_failure_detectors_for_candidate_region(&self) {
let to_peer_id = self.persistent_ctx.to_peer.id;
let region_id = self.persistent_ctx.region_id;
self.region_failure_detector_controller
.deregister_failure_detectors(vec![(to_peer_id, region_id)])
.await;
}
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route.take();
@@ -674,30 +693,38 @@ impl Procedure for RegionMigrationProcedure {
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&[name])
.start_timer();
let (next, status) = state.next(&mut self.context, ctx).await.map_err(|e| {
if e.is_retryable() {
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "retryable"])
.inc();
ProcedureError::retry_later(e)
} else {
error!(
e;
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.context.region_id(),
self.context.persistent_ctx.from_peer,
self.context.persistent_ctx.to_peer,
self.context.volatile_ctx.metrics,
);
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "external"])
.inc();
ProcedureError::external(e)
match state.next(&mut self.context, ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
})?;
*state = next;
Ok(status)
Err(e) => {
if e.is_retryable() {
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "retryable"])
.inc();
Err(ProcedureError::retry_later(e))
} else {
// Consumes the opening region guard before deregistering the failure detectors.
self.context.volatile_ctx.opening_region_guard.take();
self.context
.deregister_failure_detectors_for_candidate_region()
.await;
error!(
e;
"Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.context.region_id(),
self.context.persistent_ctx.from_peer,
self.context.persistent_ctx.to_peer,
self.context.volatile_ctx.metrics,
);
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "external"])
.inc();
Err(ProcedureError::external(e))
}
}
}
}
fn dump(&self) -> ProcedureResult<String> {

View File

@@ -16,18 +16,21 @@ use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio::time::{sleep, Instant};
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::lease::find_datanode_lease_value;
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
use crate::procedure::region_migration::{Context, State};
@@ -66,19 +69,32 @@ impl State for DowngradeLeaderRegion {
match self.downgrade_region_with_retry(ctx).await {
Ok(_) => {
// Do nothing
info!(
"Downgraded region leader success, region: {}",
ctx.persistent_ctx.region_id
);
}
Err(error::Error::ExceededDeadline { .. }) => {
info!(
"Downgrade region leader exceeded deadline, region: {}",
ctx.persistent_ctx.region_id
);
// Rollbacks the metadata if procedure is timeout
return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
}
Err(err) => {
error!(err; "Occurs non-retryable error");
error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
info!(
"Running into the downgrade leader slow path, sleep until {:?}",
deadline
"Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
ctx.persistent_ctx.region_id, deadline
);
tokio::time::sleep_until(*deadline).await;
} else {
warn!(
"Leader region lease deadline is not set, region: {}",
ctx.persistent_ctx.region_id
);
}
}
}
@@ -220,6 +236,61 @@ impl DowngradeLeaderRegion {
}
}
async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
let leader = &ctx.persistent_ctx.from_peer;
let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
Err(err) => {
error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
return;
}
};
if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis();
let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
// It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
leader,
last_connection_at,
region_lease,
ctx.persistent_ctx.region_id
);
} else if elapsed > 0 {
// `now - last_connection_at` < REGION_LEASE_SECS * 1000
let lease_timeout =
region_lease - Duration::from_millis((now - last_connection_at) as u64);
ctx.volatile_ctx.reset_leader_region_lease_deadline();
ctx.volatile_ctx
.set_leader_region_lease_deadline(lease_timeout);
info!(
"Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id
);
} else {
warn!(
"Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
leader, last_connection_at, now, ctx.persistent_ctx.region_id
)
}
} else {
warn!(
"Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
leader, ctx.persistent_ctx.region_id
)
}
}
/// Downgrades a leader region.
///
/// Fast path:
@@ -240,13 +311,20 @@ impl DowngradeLeaderRegion {
retry += 1;
// Throws the error immediately if the procedure exceeded the deadline.
if matches!(err, error::Error::ExceededDeadline { .. }) {
error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
return Err(err);
} else if matches!(err, error::Error::PusherNotFound { .. }) {
// Throws the error immediately if the datanode is unreachable.
error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
self.update_leader_region_lease_deadline(ctx).await;
return Err(err);
} else if err.is_retryable() && retry < self.optimistic_retry {
error!("Failed to downgrade region, error: {err:?}, retry later");
error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
sleep(self.retry_initial_interval).await;
} else {
error!("Failed to downgrade region, error: {err:?}");
break;
return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
region_id: ctx.persistent_ctx.region_id,
})?;
}
} else {
ctx.update_operations_elapsed(timer);
@@ -544,7 +622,11 @@ mod tests {
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(5));
let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
state.downgrade_region_with_retry(&mut ctx).await.unwrap();
let err = state
.downgrade_region_with_retry(&mut ctx)
.await
.unwrap_err();
assert_matches!(err, error::Error::DowngradeLeader { .. });
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
// Should remain no change.
assert_eq!(

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -59,8 +60,16 @@ impl State for RegionMigrationStart {
let from_peer = &ctx.persistent_ctx.from_peer;
if self.has_migrated(&region_route, to_peer)? {
info!(
"Region has been migrated, region: {:?}, to_peer: {:?}",
region_route.region.id, to_peer
);
Ok((Box::new(RegionMigrationEnd), Status::done()))
} else if self.invalid_leader_peer(&region_route, from_peer)? {
info!(
"Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}",
region_route.region.id, region_route.leader_peer, from_peer,
);
Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {from_peer:?}, expected: {:?}",
@@ -152,7 +161,7 @@ impl RegionMigrationStart {
fn has_migrated(&self, region_route: &RegionRoute, to_peer: &Peer) -> Result<bool> {
let region_id = region_route.region.id;
let region_opened = region_route
let region_migrated = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
@@ -160,8 +169,7 @@ impl RegionMigrationStart {
})?
.id
== to_peer.id;
Ok(region_opened)
Ok(region_migrated)
}
}

View File

@@ -121,6 +121,7 @@ impl TestingEnv {
table_metadata_manager: self.table_metadata_manager.clone(),
opening_region_keeper: self.opening_region_keeper.clone(),
volatile_ctx: Default::default(),
in_memory_key: Arc::new(MemoryKvBackend::default()),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),

View File

@@ -360,10 +360,9 @@ impl RegionSupervisor {
Ok(false) => {}
Ok(true) => {
warn!(
"Skipping failover since maintenance mode is enabled. Removing failure detectors for regions: {:?}",
"Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
regions
);
self.deregister_failure_detectors(regions).await;
return;
}
Err(err) => {

View File

@@ -51,6 +51,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Utilizes the short circuit evaluation.
let region = if !is_empty_memtable || region.manifest_ctx.has_update().await? {
if !is_empty_memtable {
warn!("Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
region.region_id,
region.manifest_ctx.manifest_version().await,
region.version_control.current().last_entry_id
);
}
self.reopen_region(&region).await?
} else {
region

View File

@@ -150,9 +150,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
let version = region.version();
if !version.memtables.is_empty() {
let current = region.version_control.current();
warn!(
"Region {} memtables is not empty, which should not happen, manifest version: {}",
region.region_id, manifest.manifest_version
"Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
region.region_id, manifest.manifest_version, current.last_entry_id
);
}
let region_options = version.options.clone();