feat: downgrade failed region before closing (#2715)

* feat: add update_region_status method

* feat: downgrade failed region before closing

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-11-10 14:05:05 +09:00
committed by GitHub
parent 8fd0766754
commit 22ee45f3df
4 changed files with 201 additions and 4 deletions

View File

@@ -64,6 +64,7 @@ use std::sync::Arc;
use bytes::Bytes;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_telemetry::warn;
use datanode_table::{DatanodeTableKey, DatanodeTableManager, DatanodeTableValue};
use lazy_static::lazy_static;
use regex::Regex;
@@ -83,7 +84,7 @@ use crate::ddl::utils::region_storage_path;
use crate::error::{self, Result, SerdeJsonSnafu};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus};
use crate::DatanodeId;
pub const REMOVED_PREFIX: &str = "__removed";
@@ -625,6 +626,58 @@ impl TableMetadataManager {
Ok(())
}
/// Updates the leader status of the [RegionRoute].
pub async fn update_leader_region_status<F>(
&self,
table_id: TableId,
current_table_route_value: DeserializedValueWithBytes<TableRouteValue>,
next_region_route_status: F,
) -> Result<()>
where
F: Fn(&RegionRoute) -> Option<Option<RegionStatus>>,
{
let mut new_region_routes = current_table_route_value.region_routes.clone();
let mut updated = 0;
for route in &mut new_region_routes {
if let Some(status) = next_region_route_status(route) {
if route.set_leader_status(status) {
updated += 1;
}
}
}
if updated == 0 {
warn!("No leader status updated");
return Ok(());
}
// Updates the table_route.
let new_table_route_value = current_table_route_value.update(new_region_routes);
let (update_table_route_txn, on_update_table_route_failure) = self
.table_route_manager()
.build_update_txn(table_id, &current_table_route_value, &new_table_route_value)?;
let txn = Txn::merge_all(vec![update_table_route_txn]);
let r = self.kv_backend.txn(txn).await?;
// Checks whether metadata was already updated.
if !r.succeeded {
let remote_table_route = on_update_table_route_failure(&r.responses)?
.context(error::UnexpectedSnafu {
err_msg: "Reads the empty table route during the updating leader region status",
})?
.into_inner();
let op_name = "the updating leader region status";
ensure_values!(remote_table_route, new_table_route_value, op_name);
}
Ok(())
}
}
#[macro_export]
@@ -707,7 +760,7 @@ mod tests {
use crate::key::{to_removed_key, DeserializedValueWithBytes, TableMetadataManager};
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{region_distribution, Region, RegionRoute};
use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus};
#[test]
fn test_deserialized_value_with_bytes() {
@@ -1002,6 +1055,74 @@ mod tests {
.is_err())
}
#[tokio::test]
async fn test_update_table_leader_region_status() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv);
let datanode = 1;
let region_routes = vec![
RegionRoute {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(datanode, "a2")),
leader_status: Some(RegionStatus::Downgraded),
follower_peers: vec![],
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: None,
attrs: BTreeMap::new(),
},
leader_peer: Some(Peer::new(datanode, "a1")),
leader_status: None,
follower_peers: vec![],
},
];
let table_info: RawTableInfo =
new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into();
let table_id = table_info.ident.table_id;
let current_table_route_value =
DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone()));
// creates metadata.
table_metadata_manager
.create_table_metadata(table_info.clone(), region_routes.clone())
.await
.unwrap();
table_metadata_manager
.update_leader_region_status(table_id, current_table_route_value, |region_route| {
if region_route.leader_status.is_some() {
None
} else {
Some(Some(RegionStatus::Downgraded))
}
})
.await
.unwrap();
let updated_route_value = table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap();
assert_eq!(
updated_route_value.region_routes[0].leader_status,
Some(RegionStatus::Downgraded)
);
assert_eq!(
updated_route_value.region_routes[1].leader_status,
Some(RegionStatus::Downgraded)
);
}
async fn assert_datanode_table(
table_metadata_manager: &TableMetadataManager,
table_id: u32,

View File

@@ -275,6 +275,16 @@ impl RegionRoute {
pub fn downgrade_leader(&mut self) {
self.leader_status = Some(RegionStatus::Downgraded)
}
/// Sets the leader status.
///
/// Returns true if updated.
pub fn set_leader_status(&mut self, status: Option<RegionStatus>) -> bool {
let updated = self.leader_status != status;
self.leader_status = status;
updated
}
}
pub struct RegionRoutes(pub Vec<RegionRoute>);

View File

@@ -18,15 +18,16 @@ use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionStatus;
use common_meta::RegionIdent;
use common_telemetry::{debug, info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use super::activate_region::ActivateRegion;
use super::{RegionFailoverContext, State};
use crate::error::{
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
self, Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
};
use crate::handler::HeartbeatMailbox;
use crate::inactive_region_manager::InactiveRegionManager;
@@ -42,6 +43,35 @@ impl DeactivateRegion {
Self { candidate }
}
async fn mark_leader_downgraded(
&self,
ctx: &RegionFailoverContext,
failed_region: &RegionIdent,
) -> Result<()> {
let table_id = failed_region.table_id;
let table_route_value = ctx
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.context(error::TableMetadataManagerSnafu)?
.context(error::TableRouteNotFoundSnafu { table_id })?;
ctx.table_metadata_manager
.update_leader_region_status(table_id, table_route_value, |region| {
if region.region.id.region_number() == failed_region.region_number {
Some(Some(RegionStatus::Downgraded))
} else {
None
}
})
.await
.context(error::UpdateTableRouteSnafu)?;
Ok(())
}
async fn send_close_region_message(
&self,
ctx: &RegionFailoverContext,
@@ -136,6 +166,7 @@ impl State for DeactivateRegion {
failed_region: &RegionIdent,
) -> Result<Box<dyn State>> {
info!("Deactivating region: {failed_region:?}");
self.mark_leader_downgraded(ctx, failed_region).await?;
let result = self.send_close_region_message(ctx, failed_region).await;
let mailbox_receiver = match result {
Ok(mailbox_receiver) => mailbox_receiver,
@@ -164,6 +195,40 @@ mod tests {
use super::super::tests::TestingEnvBuilder;
use super::*;
#[tokio::test]
async fn test_mark_leader_downgraded() {
common_telemetry::init_default_ut_logging();
let env = TestingEnvBuilder::new().build().await;
let failed_region = env.failed_region(1).await;
let state = DeactivateRegion::new(Peer::new(2, ""));
state
.mark_leader_downgraded(&env.context, &failed_region)
.await
.unwrap();
let table_id = failed_region.table_id;
let table_route_value = env
.context
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await
.unwrap()
.unwrap();
let should_downgraded = table_route_value
.region_routes
.iter()
.find(|route| route.region.id.region_number() == failed_region.region_number)
.unwrap();
assert!(should_downgraded.is_leader_downgraded());
}
#[tokio::test]
async fn test_deactivate_region_success() {
common_telemetry::init_default_ut_logging();

View File

@@ -86,6 +86,7 @@ impl UpdateRegionMetadata {
for region_route in new_region_routes.iter_mut() {
if region_route.region.id.region_number() == failed_region.region_number {
region_route.leader_peer = Some(self.candidate.clone());
region_route.set_leader_status(None);
break;
}
}