diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index b774060be7..5970868f1a 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -64,6 +64,7 @@ use std::sync::Arc; use bytes::Bytes; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_telemetry::warn; use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue}; use lazy_static::lazy_static; use regex::Regex; @@ -83,7 +84,7 @@ use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; -use crate::rpc::router::{region_distribution, RegionRoute}; +use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::DatanodeId; pub const REMOVED_PREFIX: &str = "__removed"; @@ -625,6 +626,58 @@ impl TableMetadataManager { Ok(()) } + + /// Updates the leader status of the [RegionRoute]. + pub async fn update_leader_region_status( + &self, + table_id: TableId, + current_table_route_value: DeserializedValueWithBytes, + next_region_route_status: F, + ) -> Result<()> + where + F: Fn(&RegionRoute) -> Option>, + { + let mut new_region_routes = current_table_route_value.region_routes.clone(); + + let mut updated = 0; + for route in &mut new_region_routes { + if let Some(status) = next_region_route_status(route) { + if route.set_leader_status(status) { + updated += 1; + } + } + } + + if updated == 0 { + warn!("No leader status updated"); + return Ok(()); + } + + // Updates the table_route. + let new_table_route_value = current_table_route_value.update(new_region_routes); + + let (update_table_route_txn, on_update_table_route_failure) = self + .table_route_manager() + .build_update_txn(table_id, ¤t_table_route_value, &new_table_route_value)?; + + let txn = Txn::merge_all(vec![update_table_route_txn]); + + let r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already updated. + if !r.succeeded { + let remote_table_route = on_update_table_route_failure(&r.responses)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty table route during the updating leader region status", + })? + .into_inner(); + + let op_name = "the updating leader region status"; + ensure_values!(remote_table_route, new_table_route_value, op_name); + } + + Ok(()) + } } #[macro_export] @@ -707,7 +760,7 @@ mod tests { use crate::key::{to_removed_key, DeserializedValueWithBytes, TableMetadataManager}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; - use crate::rpc::router::{region_distribution, Region, RegionRoute}; + use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus}; #[test] fn test_deserialized_value_with_bytes() { @@ -1002,6 +1055,74 @@ mod tests { .is_err()) } + #[tokio::test] + async fn test_update_table_leader_region_status() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + let datanode = 1; + let region_routes = vec![ + RegionRoute { + region: Region { + id: 1.into(), + name: "r1".to_string(), + partition: None, + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(datanode, "a2")), + leader_status: Some(RegionStatus::Downgraded), + follower_peers: vec![], + }, + RegionRoute { + region: Region { + id: 2.into(), + name: "r2".to_string(), + partition: None, + attrs: BTreeMap::new(), + }, + leader_peer: Some(Peer::new(datanode, "a1")), + leader_status: None, + follower_peers: vec![], + }, + ]; + let table_info: RawTableInfo = + new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); + let table_id = table_info.ident.table_id; + let current_table_route_value = + DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); + // creates metadata. + table_metadata_manager + .create_table_metadata(table_info.clone(), region_routes.clone()) + .await + .unwrap(); + + table_metadata_manager + .update_leader_region_status(table_id, current_table_route_value, |region_route| { + if region_route.leader_status.is_some() { + None + } else { + Some(Some(RegionStatus::Downgraded)) + } + }) + .await + .unwrap(); + + let updated_route_value = table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + assert_eq!( + updated_route_value.region_routes[0].leader_status, + Some(RegionStatus::Downgraded) + ); + assert_eq!( + updated_route_value.region_routes[1].leader_status, + Some(RegionStatus::Downgraded) + ); + } + async fn assert_datanode_table( table_metadata_manager: &TableMetadataManager, table_id: u32, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index e7c1d1bb05..6eca316cd4 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -275,6 +275,16 @@ impl RegionRoute { pub fn downgrade_leader(&mut self) { self.leader_status = Some(RegionStatus::Downgraded) } + + /// Sets the leader status. + /// + /// Returns true if updated. + pub fn set_leader_status(&mut self, status: Option) -> bool { + let updated = self.leader_status != status; + + self.leader_status = status; + updated + } } pub struct RegionRoutes(pub Vec); diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c3d177cce9..d24ae9f68b 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -18,15 +18,16 @@ use api::v1::meta::MailboxMessage; use async_trait::async_trait; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::peer::Peer; +use common_meta::rpc::router::RegionStatus; use common_meta::RegionIdent; use common_telemetry::{debug, info, warn}; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use super::activate_region::ActivateRegion; use super::{RegionFailoverContext, State}; use crate::error::{ - Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, + self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu, }; use crate::handler::HeartbeatMailbox; use crate::inactive_region_manager::InactiveRegionManager; @@ -42,6 +43,35 @@ impl DeactivateRegion { Self { candidate } } + async fn mark_leader_downgraded( + &self, + ctx: &RegionFailoverContext, + failed_region: &RegionIdent, + ) -> Result<()> { + let table_id = failed_region.table_id; + + let table_route_value = ctx + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + ctx.table_metadata_manager + .update_leader_region_status(table_id, table_route_value, |region| { + if region.region.id.region_number() == failed_region.region_number { + Some(Some(RegionStatus::Downgraded)) + } else { + None + } + }) + .await + .context(error::UpdateTableRouteSnafu)?; + + Ok(()) + } + async fn send_close_region_message( &self, ctx: &RegionFailoverContext, @@ -136,6 +166,7 @@ impl State for DeactivateRegion { failed_region: &RegionIdent, ) -> Result> { info!("Deactivating region: {failed_region:?}"); + self.mark_leader_downgraded(ctx, failed_region).await?; let result = self.send_close_region_message(ctx, failed_region).await; let mailbox_receiver = match result { Ok(mailbox_receiver) => mailbox_receiver, @@ -164,6 +195,40 @@ mod tests { use super::super::tests::TestingEnvBuilder; use super::*; + #[tokio::test] + async fn test_mark_leader_downgraded() { + common_telemetry::init_default_ut_logging(); + + let env = TestingEnvBuilder::new().build().await; + let failed_region = env.failed_region(1).await; + + let state = DeactivateRegion::new(Peer::new(2, "")); + + state + .mark_leader_downgraded(&env.context, &failed_region) + .await + .unwrap(); + + let table_id = failed_region.table_id; + + let table_route_value = env + .context + .table_metadata_manager + .table_route_manager() + .get(table_id) + .await + .unwrap() + .unwrap(); + + let should_downgraded = table_route_value + .region_routes + .iter() + .find(|route| route.region.id.region_number() == failed_region.region_number) + .unwrap(); + + assert!(should_downgraded.is_leader_downgraded()); + } + #[tokio::test] async fn test_deactivate_region_success() { common_telemetry::init_default_ut_logging(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 9d1abc6d64..22c1a89b54 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -86,6 +86,7 @@ impl UpdateRegionMetadata { for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { region_route.leader_peer = Some(self.candidate.clone()); + region_route.set_leader_status(None); break; } }