From 40bd6ef79ef4da77229f9f162cbdb89ddda91700 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 21 Nov 2025 11:25:44 +0800 Subject: [PATCH] chore: pick #7199 and #7266 to `release/v0.15` (#7267) * fix: correct leader state reset and region migration locking consistency (#7199) * fix(meta): remove table route cache in region migration ctx Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu * chore: fix clippy Signed-off-by: WenyXu * fix: fix campaign reset not clearing leader state-s Signed-off-by: WenyXu * feat: gracefully handle region lease renewal errors Signed-off-by: WenyXu --------- Signed-off-by: WenyXu * chore: add tests for election reset and region lease failure handling (#7266) Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/meta-srv/src/election/rds/mysql.rs | 44 ++++- src/meta-srv/src/election/rds/postgres.rs | 65 +++++-- .../src/handler/region_lease_handler.rs | 170 +++++++++++++++--- src/meta-srv/src/metasrv/builder.rs | 3 +- .../src/procedure/region_migration.rs | 59 +++--- .../downgrade_leader_region.rs | 66 +------ .../rollback_downgraded_region.rs | 103 +---------- .../upgrade_candidate_region.rs | 70 +------- src/meta-srv/src/region/supervisor.rs | 36 ++-- src/meta-srv/src/service/heartbeat.rs | 1 + src/meta-srv/src/state.rs | 6 + 11 files changed, 299 insertions(+), 324 deletions(-) diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index d9a9405cad..de531adc38 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use common_time::Timestamp; use snafu::{ensure, OptionExt, ResultExt}; use sqlx::mysql::{MySqlArguments, MySqlRow}; @@ -645,6 +645,13 @@ impl Election for MySqlElection { } async fn reset_campaign(&self) { + info!("Resetting campaign"); + if self.is_leader.load(Ordering::Relaxed) { + if let Err(err) = self.step_down_without_lock().await { + error!(err; "Failed to step down without lock"); + } + info!("Step down without lock successfully, due to reset campaign"); + } if let Err(err) = self.client.lock().await.reset_client().await { error!(err; "Failed to reset client"); } @@ -1639,6 +1646,41 @@ mod tests { drop_table(&leader_mysql_election.client, table_name).await; } + #[tokio::test] + async fn test_reset_campaign() { + maybe_skip_mysql_integration_test!(); + common_telemetry::init_default_ut_logging(); + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_reset_campaign_greptime_metakv"; + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), + }; + leader_mysql_election + .is_leader + .store(true, Ordering::Relaxed); + leader_mysql_election.reset_campaign().await; + assert!(!leader_mysql_election.is_leader()); + drop_table(&leader_mysql_election.client, table_name).await; + } + #[tokio::test] async fn test_follower_action() { maybe_skip_mysql_integration_test!(); diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index e216d987fc..a2a3c9c150 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use common_time::Timestamp; use deadpool_postgres::{Manager, Pool}; use snafu::{ensure, OptionExt, ResultExt}; @@ -454,6 +454,13 @@ impl Election for PgElection { } async fn reset_campaign(&self) { + info!("Resetting campaign"); + if self.is_leader.load(Ordering::Relaxed) { + if let Err(err) = self.step_down_without_lock().await { + error!(err; "Failed to step down without lock"); + } + info!("Step down without lock successfully, due to reset campaign"); + } if let Err(err) = self.pg_client.write().await.reset_client().await { error!(err; "Failed to reset client"); } @@ -749,18 +756,12 @@ impl PgElection { key: key.clone(), ..Default::default() }; - if self - .is_leader - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - if let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); - } - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::StepDown(Arc::new(leader_key)), + ); Ok(()) } @@ -1551,6 +1552,44 @@ mod tests { drop_table(&follower_pg_election, table_name).await; } + #[tokio::test] + async fn test_reset_campaign() { + maybe_skip_postgres_integration_test!(); + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_reset_campaign_greptime_metakv"; + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_pg_election = PgElection { + leader_value, + pg_client: RwLock::new(client), + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28321, table_name).build(), + }; + leader_pg_election.is_leader.store(true, Ordering::Relaxed); + leader_pg_election.reset_campaign().await; + assert!(!leader_pg_election.is_leader()); + drop_table(&leader_pg_election, table_name).await; + } + #[tokio::test] async fn test_idle_session_timeout() { maybe_skip_postgres_integration_test!(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 2d88a79519..239d721b71 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_telemetry::error; use store_api::region_engine::GrantedRegion; use store_api::storage::RegionId; @@ -83,36 +84,44 @@ impl HeartbeatHandler for RegionLeaseHandler { let regions = stat.regions(); let datanode_id = stat.id; - let RenewRegionLeasesResponse { - non_exists, - renewed, - } = self + match self .region_lease_keeper .renew_region_leases(datanode_id, ®ions) - .await?; + .await + { + Ok(RenewRegionLeasesResponse { + non_exists, + renewed, + }) => { + let renewed = if let Some(renewer) = &self.customized_region_lease_renewer { + renewer + .renew(ctx, renewed) + .into_iter() + .map(|region| region.into()) + .collect() + } else { + renewed + .into_iter() + .map(|(region_id, region_lease_info)| { + GrantedRegion::new(region_id, region_lease_info.role).into() + }) + .collect::>() + }; - let renewed = if let Some(renewer) = &self.customized_region_lease_renewer { - renewer - .renew(ctx, renewed) - .into_iter() - .map(|region| region.into()) - .collect() - } else { - renewed - .into_iter() - .map(|(region_id, region_lease_info)| { - GrantedRegion::new(region_id, region_lease_info.role).into() - }) - .collect::>() - }; - - acc.region_lease = Some(RegionLease { - regions: renewed, - duration_since_epoch: req.duration_since_epoch, - lease_seconds: self.region_lease_seconds, - closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), - }); - acc.inactive_region_ids = non_exists; + acc.region_lease = Some(RegionLease { + regions: renewed, + duration_since_epoch: req.duration_since_epoch, + lease_seconds: self.region_lease_seconds, + closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), + }); + acc.inactive_region_ids = non_exists; + } + Err(e) => { + error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions); + // If we throw error here, the datanode will be marked as failure by region failure handler. + // So we only log the error and continue. + } + } Ok(HandleControl::Continue) } @@ -120,18 +129,27 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { + use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use common_meta::distributed_time_constants; + use common_meta::error::Result as MetaResult; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::txn::{Txn, TxnResponse}; + use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, + BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, + }; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -404,4 +422,102 @@ mod test { assert_eq!(granted, expected); } + + struct MockKvBackend; + + #[async_trait::async_trait] + impl TxnService for MockKvBackend { + type Error = common_meta::error::Error; + + async fn txn(&self, _txn: Txn) -> MetaResult { + unimplemented!() + } + + fn max_txn_ops(&self) -> usize { + unimplemented!() + } + } + + #[async_trait::async_trait] + impl KvBackend for MockKvBackend { + fn name(&self) -> &str { + "mock_kv_backend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, _req: RangeRequest) -> MetaResult { + unimplemented!() + } + + async fn put(&self, _req: PutRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult { + common_meta::error::UnexpectedSnafu { + err_msg: "mock err", + } + .fail() + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult { + unimplemented!() + } + } + + #[tokio::test] + async fn test_handle_renew_region_lease_failure() { + common_telemetry::init_default_ut_logging(); + let kvbackend = Arc::new(MockKvBackend); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend)); + + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let no_exist_region_id = RegionId::new(table_id, region_number + 2); + let peer = Peer::empty(datanode_id); + + let builder = MetasrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let acc = &mut HeartbeatAccumulator::default(); + acc.stat = Some(Stat { + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Leader), + new_empty_region_stat(another_region_id, RegionRole::Leader), + new_empty_region_stat(no_exist_region_id, RegionRole::Leader), + ], + ..Default::default() + }); + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + Default::default(), + None, + ); + handler.handle(&req, ctx, acc).await.unwrap(); + + assert!(acc.region_lease.is_none()); + assert!(acc.inactive_region_ids.is_empty()); + } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b001de2c7e..03f50a1040 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -345,7 +345,8 @@ impl MetasrvBuilder { region_migration_manager.clone(), runtime_switch_manager.clone(), peer_lookup_service.clone(), - ); + ) + .with_state(state.clone()); Some(RegionFailureHandler::new( region_supervisor, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index ac87353818..584b61ef1f 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -38,7 +38,7 @@ use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::kv_backend::ResettableKvBackendRef; -use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_procedure::error::{ @@ -215,8 +215,6 @@ pub struct VolatileContext { /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). opening_region_guard: Option, - /// `table_route` is stored via previous steps for future use. - table_route: Option>, /// `datanode_table` is stored via previous steps for future use. from_peer_datanode_table: Option, /// `table_info` is stored via previous steps for future use. @@ -383,29 +381,23 @@ impl Context { /// Retry: /// - Failed to retrieve the metadata of table. pub async fn get_table_route_value( - &mut self, - ) -> Result<&DeserializedValueWithBytes> { - let table_route_value = &mut self.volatile_ctx.table_route; + &self, + ) -> Result> { + let table_id = self.persistent_ctx.region_id.table_id(); + let table_route = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get TableRoute: {table_id}"), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; - if table_route_value.is_none() { - let table_id = self.persistent_ctx.region_id.table_id(); - let table_route = self - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TableRoute: {table_id}"), - })? - .context(error::TableRouteNotFoundSnafu { table_id })?; - - *table_route_value = Some(table_route); - } - - Ok(table_route_value.as_ref().unwrap()) + Ok(table_route) } /// Notifies the RegionSupervisor to register failure detectors of failed region. @@ -447,12 +439,6 @@ impl Context { .await; } - /// Removes the `table_route` of [VolatileContext], returns true if any. - pub fn remove_table_route_value(&mut self) -> bool { - let value = self.volatile_ctx.table_route.take(); - value.is_some() - } - /// Returns the `table_info` of [VolatileContext] if any. /// Otherwise, returns the value retrieved from remote. /// @@ -627,14 +613,13 @@ impl RegionMigrationProcedure { }) } - async fn rollback_inner(&mut self) -> Result<()> { + async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { let _timer = METRIC_META_REGION_MIGRATION_EXECUTE .with_label_values(&["rollback"]) .start_timer(); let table_id = self.context.region_id().table_id(); let region_id = self.context.region_id(); - self.context.remove_table_route_value(); let table_metadata_manager = self.context.table_metadata_manager.clone(); let table_route = self.context.get_table_route_value().await?; @@ -647,9 +632,11 @@ impl RegionMigrationProcedure { .any(|route| route.is_leader_downgrading()); if downgraded { + let table_lock = TableLock::Write(region_id.table_id()).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; info!("Rollbacking downgraded region leader table route, region: {region_id}"); table_metadata_manager - .update_leader_region_status(table_id, table_route, |route| { + .update_leader_region_status(table_id, &table_route, |route| { if route.region.id == region_id { Some(None) } else { @@ -676,8 +663,8 @@ impl Procedure for RegionMigrationProcedure { Self::TYPE_NAME } - async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> { - self.rollback_inner() + async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> { + self.rollback_inner(ctx) .await .map_err(ProcedureError::external) } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index a83dbd4b37..6d7123de76 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -46,7 +46,7 @@ impl UpdateMetadata { // TODO(weny): ensures the leader region peer is the `from_peer`. if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |route| { + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { if route.region.id == region_id && route .leader_peer @@ -61,7 +61,6 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!( "Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}" @@ -69,8 +68,6 @@ impl UpdateMetadata { }); } - ctx.remove_table_route_value(); - Ok(()) } } @@ -81,7 +78,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -115,63 +112,6 @@ mod tests { assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_failed_to_update_table_route_error() { - let state = UpdateMetadata::Downgrade; - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 2)), - leader_peer: Some(Peer::empty(4)), - ..Default::default() - }, - ]; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let original_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &original_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - Some(Some(LeaderState::Downgrading)) - } else { - None - } - }) - .await - .unwrap(); - - // sets the old table route. - ctx.volatile_ctx.table_route = Some(original_table_route); - - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - } - #[tokio::test] async fn test_only_downgrade_from_peer() { let mut state = Box::new(UpdateMetadata::Downgrade); @@ -212,7 +152,6 @@ mod tests { // It should remain unchanged. assert_eq!(latest_table_route.version().unwrap(), 0); assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); - assert!(ctx.volatile_ctx.table_route.is_none()); } #[tokio::test] @@ -254,6 +193,5 @@ mod tests { .unwrap(); assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); - assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index fe0b5c28da..fccfc18419 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -35,7 +35,7 @@ impl UpdateMetadata { let current_table_route_value = ctx.get_table_route_value().await?; if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |route| { + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { if route.region.id == region_id { Some(None) } else { @@ -45,14 +45,12 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), }); } ctx.register_failure_detectors().await; - ctx.remove_table_route_value(); Ok(()) } @@ -61,7 +59,6 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -73,7 +70,6 @@ mod tests { use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; - use crate::region::supervisor::RegionFailureDetectorControl; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) @@ -93,101 +89,6 @@ mod tests { assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_update_table_route_with_retry() { - let state = UpdateMetadata::Rollback; - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let (tx, mut rx) = tokio::sync::mpsc::channel(8); - ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx)); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 2)), - leader_peer: Some(Peer::empty(4)), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 3)), - leader_peer: Some(Peer::empty(5)), - ..Default::default() - }, - ]; - - let expected_region_routes = { - let mut region_routes = region_routes.clone(); - region_routes[0].leader_state = None; - region_routes[1].leader_state = None; - region_routes - }; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let old_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &old_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - Some(None) - } else { - None - } - }) - .await - .unwrap(); - - ctx.volatile_ctx.table_route = Some(old_table_route); - - let err = state - .rollback_downgraded_region(&mut ctx) - .await - .unwrap_err(); - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - assert_eq!(rx.len(), 0); - state.rollback_downgraded_region(&mut ctx).await.unwrap(); - let event = rx.try_recv().unwrap(); - let detecting_regions = event.into_region_failure_detectors(); - assert_eq!( - detecting_regions, - vec![(from_peer.id, ctx.persistent_ctx.region_id)] - ); - - let table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .unwrap() - .unwrap(); - assert_eq!( - &expected_region_routes, - table_route.region_routes().unwrap() - ); - } - #[tokio::test] async fn test_next_migration_end_state() { let mut state = Box::new(UpdateMetadata::Rollback); @@ -238,8 +139,6 @@ mod tests { .downcast_ref::() .unwrap(); - assert!(ctx.volatile_ctx.table_route.is_none()); - let table_route = table_metadata_manager .table_route_manager() .table_route_storage() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 42edb52671..92760d3f30 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -160,7 +160,7 @@ impl UpdateMetadata { region_options: region_options.clone(), region_wal_options: region_wal_options.clone(), }, - table_route_value, + &table_route_value, region_routes, ®ion_options, ®ion_wal_options, @@ -168,13 +168,11 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"), }); }; - ctx.remove_table_route_value(); ctx.deregister_failure_detectors().await; // Consumes the guard. ctx.volatile_ctx.opening_region_guard.take(); @@ -304,71 +302,6 @@ mod tests { assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2); } - #[tokio::test] - async fn test_failed_to_update_table_route_error() { - let state = UpdateMetadata::Upgrade; - let env = TestingEnv::new(); - let persistent_context = new_persistent_context(); - let mut ctx = env.context_factory().new_context(persistent_context); - let opening_keeper = MemoryRegionKeeper::default(); - - let table_id = 1024; - let table_info = new_test_table_info(table_id, vec![1]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5), Peer::empty(3)], - leader_state: Some(LeaderState::Downgrading), - leader_down_since: Some(current_time_millis()), - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(4)), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - ]; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let original_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &original_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - // Removes the status. - Some(None) - } else { - None - } - }) - .await - .unwrap(); - - // sets the old table route. - ctx.volatile_ctx.table_route = Some(original_table_route); - let guard = opening_keeper - .register(2, RegionId::new(table_id, 1)) - .unwrap(); - ctx.volatile_ctx.opening_region_guard = Some(guard); - let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err(); - - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(ctx.volatile_ctx.opening_region_guard.is_some()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - } - #[tokio::test] async fn test_check_metadata() { let state = UpdateMetadata::Upgrade; @@ -486,7 +419,6 @@ mod tests { .unwrap(); let region_routes = table_route.region_routes().unwrap(); - assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); assert_eq!(region_routes.len(), 1); assert!(!region_routes[0].is_leader_downgrading()); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index be904dc97f..f6aa891b8b 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -42,6 +42,7 @@ use crate::procedure::region_migration::{ }; use crate::region::failure_detector::RegionFailureDetector; use crate::selector::SelectorOptions; +use crate::state::StateRef; /// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. /// It includes identifiers for the cluster and datanode, a list of regions being monitored, @@ -86,16 +87,6 @@ pub(crate) enum Event { Dump(tokio::sync::oneshot::Sender), } -#[cfg(test)] -impl Event { - pub(crate) fn into_region_failure_detectors(self) -> Vec { - match self { - Self::RegisterFailureDetectors(detecting_regions) => detecting_regions, - _ => unreachable!(), - } - } -} - impl Debug for Event { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -228,6 +219,8 @@ pub struct RegionSupervisor { runtime_switch_manager: RuntimeSwitchManagerRef, /// Peer lookup service peer_lookup: PeerLookupServiceRef, + /// The meta state, used to check if the current metasrv is the leader. + state: Option, } /// Controller for managing failure detectors for regions. @@ -308,12 +301,29 @@ impl RegionSupervisor { region_migration_manager, runtime_switch_manager, peer_lookup, + state: None, } } + /// Sets the meta state. + pub(crate) fn with_state(mut self, state: StateRef) -> Self { + self.state = Some(state); + self + } + /// Runs the main loop. pub(crate) async fn run(&mut self) { while let Some(event) = self.receiver.recv().await { + if let Some(state) = self.state.as_ref() + && !state.read().unwrap().is_leader() + { + warn!( + "The current metasrv is not the leader, ignore {:?} event", + event + ); + continue; + } + match event { Event::Tick => { let regions = self.detect_region_failure(); @@ -326,7 +336,10 @@ impl RegionSupervisor { self.deregister_failure_detectors(detecting_regions).await } Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), - Event::Clear => self.clear(), + Event::Clear => { + self.clear(); + info!("Region supervisor is initialized."); + } #[cfg(test)] Event::Dump(sender) => { let _ = sender.send(self.failure_detector.dump()); @@ -759,6 +772,7 @@ pub(crate) mod tests { while let Ok(event) = rx.try_recv() { assert_matches!(event, Event::Tick | Event::Clear); } + assert!(ticker.tick_handle.lock().unwrap().is_none()); } } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index d1a3a0e636..0c45a98939 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -79,6 +79,7 @@ impl heartbeat_server::Heartbeat for Metasrv { let res = handler_group .handle(req, ctx.clone()) .await + .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", )) .map_err(|e| e.into()); is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader()); diff --git a/src/meta-srv/src/state.rs b/src/meta-srv/src/state.rs index 0466644c67..35d10d5bd7 100644 --- a/src/meta-srv/src/state.rs +++ b/src/meta-srv/src/state.rs @@ -75,6 +75,12 @@ impl State { }) } + /// Returns true if the current state is a leader. + pub fn is_leader(&self) -> bool { + matches!(self, State::Leader(_)) + } + + /// Returns true if the leader cache is enabled. pub fn enable_leader_cache(&self) -> bool { match &self { State::Leader(leader) => leader.enable_leader_cache,