diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index 014923c7c3..a15d1bac51 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use common_time::Timestamp; use snafu::{OptionExt, ResultExt, ensure}; use sqlx::mysql::{MySqlArguments, MySqlRow}; @@ -645,6 +645,13 @@ impl Election for MySqlElection { } async fn reset_campaign(&self) { + info!("Resetting campaign"); + if self.is_leader.load(Ordering::Relaxed) { + if let Err(err) = self.step_down_without_lock().await { + error!(err; "Failed to step down without lock"); + } + info!("Step down without lock successfully, due to reset campaign"); + } if let Err(err) = self.client.lock().await.reset_client().await { error!(err; "Failed to reset client"); } diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index beab74dac4..5d8a8bf2fa 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info, warn}; use common_time::Timestamp; use deadpool_postgres::{Manager, Pool}; use snafu::{OptionExt, ResultExt, ensure}; @@ -477,6 +477,13 @@ impl Election for PgElection { } async fn reset_campaign(&self) { + info!("Resetting campaign"); + if self.is_leader.load(Ordering::Relaxed) { + if let Err(err) = self.step_down_without_lock().await { + error!(err; "Failed to step down without lock"); + } + info!("Step down without lock successfully, due to reset campaign"); + } if let Err(err) = self.pg_client.write().await.reset_client().await { error!(err; "Failed to reset client"); } @@ -774,16 +781,12 @@ impl PgElection { key: key.clone(), ..Default::default() }; - if self - .is_leader - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - && let Err(e) = self - .leader_watcher - .send(LeaderChangeMessage::StepDown(Arc::new(leader_key))) - { - error!(e; "Failed to send leader change message"); - } + send_leader_change_and_set_flags( + &self.is_leader, + &self.leader_infancy, + &self.leader_watcher, + LeaderChangeMessage::StepDown(Arc::new(leader_key)), + ); Ok(()) } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index d7c2466e07..1dd49cd44e 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -19,6 +19,7 @@ use api::v1::meta::{HeartbeatRequest, RegionLease, Role}; use async_trait::async_trait; use common_meta::key::TableMetadataManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_telemetry::error; use store_api::region_engine::GrantedRegion; use store_api::storage::RegionId; @@ -83,36 +84,44 @@ impl HeartbeatHandler for RegionLeaseHandler { let regions = stat.regions(); let datanode_id = stat.id; - let RenewRegionLeasesResponse { - non_exists, - renewed, - } = self + match self .region_lease_keeper .renew_region_leases(datanode_id, ®ions) - .await?; + .await + { + Ok(RenewRegionLeasesResponse { + non_exists, + renewed, + }) => { + let renewed = if let Some(renewer) = &self.customized_region_lease_renewer { + renewer + .renew(ctx, renewed) + .into_iter() + .map(|region| region.into()) + .collect() + } else { + renewed + .into_iter() + .map(|(region_id, region_lease_info)| { + GrantedRegion::new(region_id, region_lease_info.role).into() + }) + .collect::>() + }; - let renewed = if let Some(renewer) = &self.customized_region_lease_renewer { - renewer - .renew(ctx, renewed) - .into_iter() - .map(|region| region.into()) - .collect() - } else { - renewed - .into_iter() - .map(|(region_id, region_lease_info)| { - GrantedRegion::new(region_id, region_lease_info.role).into() - }) - .collect::>() - }; - - acc.region_lease = Some(RegionLease { - regions: renewed, - duration_since_epoch: req.duration_since_epoch, - lease_seconds: self.region_lease_seconds, - closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), - }); - acc.inactive_region_ids = non_exists; + acc.region_lease = Some(RegionLease { + regions: renewed, + duration_since_epoch: req.duration_since_epoch, + lease_seconds: self.region_lease_seconds, + closeable_region_ids: non_exists.iter().map(|region| region.as_u64()).collect(), + }); + acc.inactive_region_ids = non_exists; + } + Err(e) => { + error!(e; "Failed to renew region leases for datanode: {datanode_id:?}, regions: {:?}", regions); + // If we throw error here, the datanode will be marked as failure by region failure handler. + // So we only log the error and continue. + } + } Ok(HandleControl::Continue) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0bcc914e27..5a33dc9c4f 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -373,7 +373,8 @@ impl MetasrvBuilder { runtime_switch_manager.clone(), meta_peer_client.clone(), leader_cached_kv_backend.clone(), - ); + ) + .with_state(state.clone()); Some(RegionFailureHandler::new( region_supervisor, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index cfef9158ef..49ed2834cd 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -41,7 +41,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey}; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; -use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_procedure::error::{ @@ -231,8 +231,6 @@ pub struct VolatileContext { /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). opening_region_guard: Option, - /// `table_route` is stored via previous steps for future use. - table_route: Option>, /// `datanode_table` is stored via previous steps for future use. from_peer_datanode_table: Option, /// `table_info` is stored via previous steps for future use. @@ -399,29 +397,23 @@ impl Context { /// Retry: /// - Failed to retrieve the metadata of table. pub async fn get_table_route_value( - &mut self, - ) -> Result<&DeserializedValueWithBytes> { - let table_route_value = &mut self.volatile_ctx.table_route; + &self, + ) -> Result> { + let table_id = self.persistent_ctx.region_id.table_id(); + let table_route = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get TableRoute: {table_id}"), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; - if table_route_value.is_none() { - let table_id = self.persistent_ctx.region_id.table_id(); - let table_route = self - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TableRoute: {table_id}"), - })? - .context(error::TableRouteNotFoundSnafu { table_id })?; - - *table_route_value = Some(table_route); - } - - Ok(table_route_value.as_ref().unwrap()) + Ok(table_route) } /// Notifies the RegionSupervisor to register failure detectors of failed region. @@ -463,12 +455,6 @@ impl Context { .await; } - /// Removes the `table_route` of [VolatileContext], returns true if any. - pub fn remove_table_route_value(&mut self) -> bool { - let value = self.volatile_ctx.table_route.take(); - value.is_some() - } - /// Returns the `table_info` of [VolatileContext] if any. /// Otherwise, returns the value retrieved from remote. /// @@ -663,14 +649,13 @@ impl RegionMigrationProcedure { }) } - async fn rollback_inner(&mut self) -> Result<()> { + async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> { let _timer = METRIC_META_REGION_MIGRATION_EXECUTE .with_label_values(&["rollback"]) .start_timer(); let table_id = self.context.region_id().table_id(); let region_id = self.context.region_id(); - self.context.remove_table_route_value(); let table_metadata_manager = self.context.table_metadata_manager.clone(); let table_route = self.context.get_table_route_value().await?; @@ -683,9 +668,11 @@ impl RegionMigrationProcedure { .any(|route| route.is_leader_downgrading()); if downgraded { + let table_lock = TableLock::Write(region_id.table_id()).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; info!("Rollbacking downgraded region leader table route, region: {region_id}"); table_metadata_manager - .update_leader_region_status(table_id, table_route, |route| { + .update_leader_region_status(table_id, &table_route, |route| { if route.region.id == region_id { Some(None) } else { @@ -712,8 +699,8 @@ impl Procedure for RegionMigrationProcedure { Self::TYPE_NAME } - async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> { - self.rollback_inner() + async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> { + self.rollback_inner(ctx) .await .map_err(ProcedureError::external) } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 28633efa56..77e5acbacd 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -46,7 +46,7 @@ impl UpdateMetadata { // TODO(weny): ensures the leader region peer is the `from_peer`. if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |route| { + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { if route.region.id == region_id && route .leader_peer @@ -61,7 +61,6 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!( "Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}" @@ -69,8 +68,6 @@ impl UpdateMetadata { }); } - ctx.remove_table_route_value(); - Ok(()) } } @@ -81,7 +78,7 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; - use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; use crate::error::Error; @@ -115,63 +112,6 @@ mod tests { assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_failed_to_update_table_route_error() { - let state = UpdateMetadata::Downgrade; - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 2)), - leader_peer: Some(Peer::empty(4)), - ..Default::default() - }, - ]; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let original_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &original_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - Some(Some(LeaderState::Downgrading)) - } else { - None - } - }) - .await - .unwrap(); - - // sets the old table route. - ctx.volatile_ctx.table_route = Some(original_table_route); - - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - } - #[tokio::test] async fn test_only_downgrade_from_peer() { let mut state = Box::new(UpdateMetadata::Downgrade); @@ -212,7 +152,6 @@ mod tests { // It should remain unchanged. assert_eq!(latest_table_route.version().unwrap(), 0); assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); - assert!(ctx.volatile_ctx.table_route.is_none()); } #[tokio::test] @@ -254,6 +193,5 @@ mod tests { .unwrap(); assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgrading()); - assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 7c4a7b713e..8f50e14b33 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -35,7 +35,7 @@ impl UpdateMetadata { let current_table_route_value = ctx.get_table_route_value().await?; if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, current_table_route_value, |route| { + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { if route.region.id == region_id { Some(None) } else { @@ -45,14 +45,12 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), }); } ctx.register_failure_detectors().await; - ctx.remove_table_route_value(); Ok(()) } @@ -61,7 +59,6 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -73,7 +70,6 @@ mod tests { use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; - use crate::region::supervisor::RegionFailureDetectorControl; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) @@ -93,101 +89,6 @@ mod tests { assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_update_table_route_with_retry() { - let state = UpdateMetadata::Rollback; - let persistent_context = new_persistent_context(); - let from_peer = persistent_context.from_peer.clone(); - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - let (tx, mut rx) = tokio::sync::mpsc::channel(8); - ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx)); - let table_id = ctx.region_id().table_id(); - - let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), - leader_peer: Some(from_peer.clone()), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 2)), - leader_peer: Some(Peer::empty(4)), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - RegionRoute { - region: Region::new_test(RegionId::new(1024, 3)), - leader_peer: Some(Peer::empty(5)), - ..Default::default() - }, - ]; - - let expected_region_routes = { - let mut region_routes = region_routes.clone(); - region_routes[0].leader_state = None; - region_routes[1].leader_state = None; - region_routes - }; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let old_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &old_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - Some(None) - } else { - None - } - }) - .await - .unwrap(); - - ctx.volatile_ctx.table_route = Some(old_table_route); - - let err = state - .rollback_downgraded_region(&mut ctx) - .await - .unwrap_err(); - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - assert_eq!(rx.len(), 0); - state.rollback_downgraded_region(&mut ctx).await.unwrap(); - let event = rx.try_recv().unwrap(); - let detecting_regions = event.into_region_failure_detectors(); - assert_eq!( - detecting_regions, - vec![(from_peer.id, ctx.persistent_ctx.region_id)] - ); - - let table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(table_id) - .await - .unwrap() - .unwrap(); - assert_eq!( - &expected_region_routes, - table_route.region_routes().unwrap() - ); - } - #[tokio::test] async fn test_next_migration_end_state() { let mut state = Box::new(UpdateMetadata::Rollback); @@ -238,8 +139,6 @@ mod tests { .downcast_ref::() .unwrap(); - assert!(ctx.volatile_ctx.table_route.is_none()); - let table_route = table_metadata_manager .table_route_manager() .table_route_storage() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 7352336d86..7e33c9c75c 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -166,7 +166,7 @@ impl UpdateMetadata { region_options: region_options.clone(), region_wal_options: region_wal_options.clone(), }, - table_route_value, + &table_route_value, region_routes, ®ion_options, ®ion_wal_options, @@ -174,13 +174,11 @@ impl UpdateMetadata { .await .context(error::TableMetadataManagerSnafu) { - ctx.remove_table_route_value(); return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"), }); }; - ctx.remove_table_route_value(); ctx.deregister_failure_detectors().await; // Consumes the guard. ctx.volatile_ctx.opening_region_guard.take(); @@ -310,71 +308,6 @@ mod tests { assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2); } - #[tokio::test] - async fn test_failed_to_update_table_route_error() { - let state = UpdateMetadata::Upgrade; - let env = TestingEnv::new(); - let persistent_context = new_persistent_context(); - let mut ctx = env.context_factory().new_context(persistent_context); - let opening_keeper = MemoryRegionKeeper::default(); - - let table_id = 1024; - let table_info = new_test_table_info(table_id, vec![1]).into(); - let region_routes = vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5), Peer::empty(3)], - leader_state: Some(LeaderState::Downgrading), - leader_down_since: Some(current_time_millis()), - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(4)), - leader_state: Some(LeaderState::Downgrading), - ..Default::default() - }, - ]; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - - let table_metadata_manager = env.table_metadata_manager(); - let original_table_route = table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .unwrap() - .unwrap(); - - // modifies the table route. - table_metadata_manager - .update_leader_region_status(table_id, &original_table_route, |route| { - if route.region.id == RegionId::new(1024, 2) { - // Removes the status. - Some(None) - } else { - None - } - }) - .await - .unwrap(); - - // sets the old table route. - ctx.volatile_ctx.table_route = Some(original_table_route); - let guard = opening_keeper - .register(2, RegionId::new(table_id, 1)) - .unwrap(); - ctx.volatile_ctx.opening_region_guard = Some(guard); - let err = state.upgrade_candidate_region(&mut ctx).await.unwrap_err(); - - assert!(ctx.volatile_ctx.table_route.is_none()); - assert!(ctx.volatile_ctx.opening_region_guard.is_some()); - assert!(err.is_retryable()); - assert!(format!("{err:?}").contains("Failed to update the table route")); - } - #[tokio::test] async fn test_check_metadata() { let state = UpdateMetadata::Upgrade; @@ -492,7 +425,6 @@ mod tests { .unwrap(); let region_routes = table_route.region_routes().unwrap(); - assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); assert_eq!(region_routes.len(), 1); assert!(!region_routes[0].is_leader_downgrading()); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index cb198b6787..97acaf4b07 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -52,6 +52,7 @@ use crate::procedure::region_migration::{ }; use crate::region::failure_detector::RegionFailureDetector; use crate::selector::SelectorOptions; +use crate::state::StateRef; /// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. /// It includes identifiers for the cluster and datanode, a list of regions being monitored, @@ -100,16 +101,6 @@ pub(crate) enum Event { Dump(tokio::sync::oneshot::Sender), } -#[cfg(test)] -impl Event { - pub(crate) fn into_region_failure_detectors(self) -> Vec { - match self { - Self::RegisterFailureDetectors(detecting_regions) => detecting_regions, - _ => unreachable!(), - } - } -} - impl Debug for Event { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -139,6 +130,9 @@ pub struct RegionSupervisorTicker { /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`]. tick_handle: Mutex>>, + /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`]. + initialization_handler: Mutex>>, + /// The interval of tick. tick_interval: Duration, @@ -182,6 +176,7 @@ impl RegionSupervisorTicker { ); Self { tick_handle: Mutex::new(None), + initialization_handler: Mutex::new(None), tick_interval, initialization_delay, initialization_retry_period, @@ -202,7 +197,7 @@ impl RegionSupervisorTicker { self.initialization_retry_period, ); initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - common_runtime::spawn_global(async move { + let initialization_handler = common_runtime::spawn_global(async move { loop { initialization_interval.tick().await; let (tx, rx) = oneshot::channel(); @@ -218,6 +213,7 @@ impl RegionSupervisorTicker { } } }); + *self.initialization_handler.lock().unwrap() = Some(initialization_handler); let sender = self.sender.clone(); let ticker_loop = tokio::spawn(async move { @@ -247,6 +243,11 @@ impl RegionSupervisorTicker { handle.abort(); info!("The tick loop is stopped."); } + let initialization_handler = self.initialization_handler.lock().unwrap().take(); + if let Some(initialization_handler) = initialization_handler { + initialization_handler.abort(); + info!("The initialization loop is stopped."); + } } } @@ -290,6 +291,8 @@ pub struct RegionSupervisor { peer_resolver: PeerResolverRef, /// The kv backend. kv_backend: KvBackendRef, + /// The meta state, used to check if the current metasrv is the leader. + state: Option, } /// Controller for managing failure detectors for regions. @@ -373,12 +376,29 @@ impl RegionSupervisor { runtime_switch_manager, peer_resolver, kv_backend, + state: None, } } + /// Sets the meta state. + pub(crate) fn with_state(mut self, state: StateRef) -> Self { + self.state = Some(state); + self + } + /// Runs the main loop. pub(crate) async fn run(&mut self) { while let Some(event) = self.receiver.recv().await { + if let Some(state) = self.state.as_ref() + && !state.read().unwrap().is_leader() + { + warn!( + "The current metasrv is not the leader, ignore {:?} event", + event + ); + continue; + } + match event { Event::InitializeAllRegions(sender) => { match self.is_maintenance_mode_enabled().await { @@ -413,7 +433,10 @@ impl RegionSupervisor { self.deregister_failure_detectors(detecting_regions).await } Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), - Event::Clear => self.clear(), + Event::Clear => { + self.clear(); + info!("Region supervisor is initialized."); + } #[cfg(test)] Event::Dump(sender) => { let _ = sender.send(self.failure_detector.dump()); @@ -906,6 +929,7 @@ pub(crate) mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(128); let ticker = RegionSupervisorTicker { tick_handle: Mutex::new(None), + initialization_handler: Mutex::new(None), tick_interval: Duration::from_millis(10), initialization_delay: Duration::from_millis(100), initialization_retry_period: Duration::from_millis(100), @@ -932,6 +956,7 @@ pub(crate) mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(128); let ticker = RegionSupervisorTicker { tick_handle: Mutex::new(None), + initialization_handler: Mutex::new(None), tick_interval: Duration::from_millis(1000), initialization_delay: Duration::from_millis(50), initialization_retry_period: Duration::from_millis(50), diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 6b63116de6..046318def9 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -79,6 +79,7 @@ impl heartbeat_server::Heartbeat for Metasrv { let res = handler_group .handle(req, ctx.clone()) .await + .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", )) .map_err(|e| e.into()); is_not_leader = res.as_ref().is_ok_and(|r| r.is_not_leader()); diff --git a/src/meta-srv/src/state.rs b/src/meta-srv/src/state.rs index 4d062c0b2e..e5edc5f169 100644 --- a/src/meta-srv/src/state.rs +++ b/src/meta-srv/src/state.rs @@ -75,6 +75,12 @@ impl State { }) } + /// Returns true if the current state is a leader. + pub fn is_leader(&self) -> bool { + matches!(self, State::Leader(_)) + } + + /// Returns true if the leader cache is enabled. pub fn enable_leader_cache(&self) -> bool { match &self { State::Leader(leader) => leader.enable_leader_cache,