From 7e1ba49d3de3fdb5c666ad09658cb631afe12aba Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 28 Mar 2025 16:10:30 +0800 Subject: [PATCH] refactor: remove useless region follower legacy code (#5795) --- src/meta-srv/src/procedure/region_follower.rs | 229 --------------- .../region_follower/add_region_follower.rs | 264 ------------------ .../region_follower/remove_region_follower.rs | 252 ----------------- 3 files changed, 745 deletions(-) delete mode 100644 src/meta-srv/src/procedure/region_follower.rs delete mode 100644 src/meta-srv/src/procedure/region_follower/add_region_follower.rs delete mode 100644 src/meta-srv/src/procedure/region_follower/remove_region_follower.rs diff --git a/src/meta-srv/src/procedure/region_follower.rs b/src/meta-srv/src/procedure/region_follower.rs deleted file mode 100644 index e383d6f567..0000000000 --- a/src/meta-srv/src/procedure/region_follower.rs +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod manager; - -pub mod add_region_follower; -mod create; -mod remove; -pub mod remove_region_follower; -#[cfg(test)] -mod test_util; - -use common_error::ext::BoxedError; -use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo}; -use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; -use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; -use common_meta::peer::Peer; -use common_meta::{distributed_time_constants, DatanodeId}; -use common_procedure::StringKey; -use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionId; -use strum::AsRefStr; - -use crate::cluster::MetaPeerClientRef; -use crate::error::{self, Result}; -use crate::lease::lookup_datanode_peer; -use crate::service::mailbox::MailboxRef; - -#[derive(Clone)] -/// The context of add/remove region follower procedure. -pub struct Context { - /// The table metadata manager. - pub table_metadata_manager: TableMetadataManagerRef, - /// The mailbox. - pub mailbox: MailboxRef, - /// The metasrv's address. - pub server_addr: String, - /// The cache invalidator. - pub cache_invalidator: CacheInvalidatorRef, - /// The meta peer client. - pub meta_peer_client: MetaPeerClientRef, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct AlterRegionFollowerData { - /// The catalog name. - pub(crate) catalog: String, - /// The schema name. - pub(crate) schema: String, - /// The region id. - pub(crate) region_id: RegionId, - /// The peer id of the datanode to add region follower. - pub(crate) peer_id: u64, - /// The peer of the datanode to add region follower. - pub(crate) peer: Option, - /// The datanode table value of the region. - pub(crate) datanode_table_value: Option, - /// The physical table route of the region. - pub(crate) table_route: Option<( - DeserializedValueWithBytes, - PhysicalTableRouteValue, - )>, - /// The state. - pub(crate) state: AlterRegionFollowerState, -} - -impl AlterRegionFollowerData { - pub fn lock_key(&self) -> Vec { - let region_id = self.region_id; - let lock_key = vec![ - CatalogLock::Read(&self.catalog).into(), - SchemaLock::read(&self.catalog, &self.schema).into(), - // The optimistic updating of table route is not working very well, - // so we need to use the write lock here. - TableLock::Write(region_id.table_id()).into(), - RegionLock::Write(region_id).into(), - ]; - - lock_key - } - - pub(crate) fn datanode_peer(&self) -> Option<&Peer> { - self.peer.as_ref() - } - - pub(crate) fn physical_table_route(&self) -> Option<&PhysicalTableRouteValue> { - self.table_route - .as_ref() - .map(|(_, table_route)| table_route) - } - - /// Returns the region info of the region. - pub(crate) fn region_info(&self) -> Option { - self.datanode_table_value - .as_ref() - .map(|datanode_table_value| datanode_table_value.region_info.clone()) - } - - /// Loads the datanode peer. - pub(crate) async fn load_datanode_peer(&self, ctx: &Context) -> Result> { - let peer = lookup_datanode_peer( - self.peer_id, - &ctx.meta_peer_client, - distributed_time_constants::DATANODE_LEASE_SECS, - ) - .await? - .context(error::PeerUnavailableSnafu { - peer_id: self.peer_id, - })?; - - Ok(Some(peer)) - } - - /// Loads the datanode table value of the region. - pub(crate) async fn load_datanode_table_value( - &self, - ctx: &Context, - datanode_id: DatanodeId, - ) -> Result> { - let table_id = self.region_id.table_id(); - let datanode_table_key = DatanodeTableKey { - datanode_id, - table_id, - }; - - let datanode_table_value = ctx - .table_metadata_manager - .datanode_table_manager() - .get(&datanode_table_key) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"), - })? - .context(error::DatanodeTableNotFoundSnafu { - table_id, - datanode_id, - })?; - - Ok(Some(datanode_table_value)) - } - - /// Loads the table route of the region, returns the physical table id. - pub(crate) async fn load_table_route( - &self, - ctx: &Context, - ) -> Result< - Option<( - DeserializedValueWithBytes, - PhysicalTableRouteValue, - )>, - > { - let table_id = self.region_id.table_id(); - let raw_table_route = ctx - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get_with_raw_bytes(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TableRoute: {table_id}"), - })? - .context(error::TableRouteNotFoundSnafu { table_id })?; - let table_route = raw_table_route.clone().into_inner(); - - ensure!( - table_route.is_physical(), - error::LogicalTableCannotAddFollowerSnafu { table_id } - ); - - Ok(Some(( - raw_table_route, - table_route.into_physical_table_route(), - ))) - } -} - -#[derive(Debug, Serialize, Deserialize, AsRefStr)] -pub enum AlterRegionFollowerState { - /// Prepares to alter region follower. - Prepare, - /// Sends alter region follower request to Datanode. - SubmitRequest, - /// Updates table metadata. - UpdateMetadata, - /// Broadcasts the invalidate table route cache message. - InvalidateTableCache, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_data_serialization() { - let data = AlterRegionFollowerData { - catalog: "test_catalog".to_string(), - schema: "test_schema".to_string(), - region_id: RegionId::new(1, 1), - peer_id: 1, - peer: None, - datanode_table_value: None, - table_route: None, - state: AlterRegionFollowerState::Prepare, - }; - - assert_eq!(data.region_id.as_u64(), 4294967297); - let serialized = serde_json::to_string(&data).unwrap(); - let expected = r#"{"catalog":"test_catalog","schema":"test_schema","region_id":4294967297,"peer_id":1,"peer":null,"datanode_table_value":null,"table_route":null,"state":"Prepare"}"#; - assert_eq!(expected, serialized); - } -} diff --git a/src/meta-srv/src/procedure/region_follower/add_region_follower.rs b/src/meta-srv/src/procedure/region_follower/add_region_follower.rs deleted file mode 100644 index e79a177037..0000000000 --- a/src/meta-srv/src/procedure/region_follower/add_region_follower.rs +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_meta::instruction::CacheIdent; -use common_procedure::error::ToJsonSnafu; -use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, - Result as ProcedureResult, Status, -}; -use common_telemetry::info; -use snafu::ResultExt; -use store_api::storage::RegionId; - -use super::create::CreateFollower; -use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context}; -use crate::error::{self, Result}; -use crate::metrics; - -/// The procedure to add a region follower. -pub struct AddRegionFollowerProcedure { - pub data: AlterRegionFollowerData, - pub context: Context, -} - -impl AddRegionFollowerProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::AddRegionFollower"; - - pub fn new( - catalog: String, - schema: String, - region_id: RegionId, - peer_id: u64, - context: Context, - ) -> Self { - Self { - data: AlterRegionFollowerData { - catalog, - schema, - region_id, - peer_id, - peer: None, - datanode_table_value: None, - table_route: None, - state: AlterRegionFollowerState::Prepare, - }, - context, - } - } - - pub fn from_json(json: &str, context: Context) -> ProcedureResult { - let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap(); - Ok(Self { data, context }) - } - - pub async fn on_prepare(&mut self) -> Result { - // loads the datanode peer and check peer is alive - self.data.peer = self.data.load_datanode_peer(&self.context).await?; - - // loads the table route of the region - self.data.table_route = self.data.load_table_route(&self.context).await?; - let region_leader_datanode_id = { - let table_route = self.data.physical_table_route().unwrap(); - table_route - .region_routes - .iter() - .find(|region_route| region_route.region.id == self.data.region_id) - .map(|region_route| region_route.leader_peer.as_ref().unwrap().id) - .unwrap_or_default() - }; - - // loads the datanode table value - self.data.datanode_table_value = self - .data - .load_datanode_table_value(&self.context, region_leader_datanode_id) - .await?; - - let table_route = self.data.physical_table_route().unwrap(); - - let datanode_peer = self.data.datanode_peer().unwrap(); - - // check if the destination peer is already a leader/follower of the region - for region_route in &table_route.region_routes { - if region_route.region.id != self.data.region_id { - continue; - } - let Some(leader_peer) = ®ion_route.leader_peer else { - continue; - }; - - // check if the destination peer is already a leader of the region - if leader_peer.id == datanode_peer.id { - return error::RegionFollowerLeaderConflictSnafu { - region_id: self.data.region_id, - peer_id: datanode_peer.id, - } - .fail(); - } - - // check if the destination peer is already a follower of the region - if region_route - .follower_peers - .iter() - .any(|peer| peer.id == datanode_peer.id) - { - return error::MultipleRegionFollowersOnSameNodeSnafu { - region_id: self.data.region_id, - peer_id: datanode_peer.id, - } - .fail(); - } - } - - info!( - "Add region({}) follower procedure is preparing, peer: {datanode_peer:?}", - self.data.region_id - ); - self.data.state = AlterRegionFollowerState::SubmitRequest; - - Ok(Status::executing(true)) - } - - pub async fn on_submit_request(&mut self) -> Result { - let region_id = self.data.region_id; - // Safety: we have already set the peer in `on_prepare``. - let peer = self.data.peer.clone().unwrap(); - let create_follower = CreateFollower::new(region_id, peer); - let instruction = create_follower - .build_open_region_instruction(self.data.region_info().unwrap()) - .await?; - create_follower - .send_open_region_instruction(&self.context, instruction) - .await?; - self.data.state = AlterRegionFollowerState::UpdateMetadata; - - Ok(Status::executing(true)) - } - - pub async fn on_update_metadata(&mut self) -> Result { - // Safety: we have already load the table route in `on_prepare``. - let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap(); - - let mut new_region_routes = phy_table_route.region_routes.clone(); - for region_route in &mut new_region_routes { - if region_route.region.id != self.data.region_id { - continue; - } - region_route - .follower_peers - .push(self.data.peer.clone().unwrap()); - } - - // Safety: we have already load the region info in `on_prepare`. - let region_info = self.data.region_info().unwrap(); - let new_region_options = region_info.region_options.clone(); - let new_region_wal_options = region_info.region_wal_options.clone(); - - self.context - .table_metadata_manager - .update_table_route( - self.data.region_id.table_id(), - region_info, - current_table_route_value, - new_region_routes, - &new_region_options, - &new_region_wal_options, - ) - .await - .context(error::TableMetadataManagerSnafu)?; - self.data.state = AlterRegionFollowerState::InvalidateTableCache; - - Ok(Status::executing(true)) - } - - pub async fn on_broadcast(&mut self) -> Result { - let table_id = self.data.region_id.table_id(); - // ignore the result - let ctx = common_meta::cache_invalidator::Context::default(); - let _ = self - .context - .cache_invalidator - .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) - .await; - Ok(Status::done()) - } -} - -#[async_trait::async_trait] -impl Procedure for AddRegionFollowerProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.data.state; - - let _timer = metrics::METRIC_META_ADD_REGION_FOLLOWER_EXECUTE - .with_label_values(&[state.as_ref()]) - .start_timer(); - - match state { - AlterRegionFollowerState::Prepare => self.on_prepare().await, - AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await, - AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await, - AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await, - } - .map_err(|e| { - if e.is_retryable() { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - }) - } - - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) - } - - fn lock_key(&self) -> LockKey { - LockKey::new(self.data.lock_key()) - } -} - -#[cfg(test)] -mod tests { - use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; - - use super::*; - use crate::procedure::region_follower::test_util::TestingEnv; - - #[tokio::test] - async fn test_lock_key() { - let env = TestingEnv::new(); - let context = env.new_context(); - - let procedure = AddRegionFollowerProcedure::new( - "test_catalog".to_string(), - "test_schema".to_string(), - RegionId::new(1, 1), - 1, - context, - ); - - let key = procedure.lock_key(); - let keys = key.keys_to_lock().cloned().collect::>(); - - assert_eq!(keys.len(), 4); - assert!(keys.contains(&CatalogLock::Read("test_catalog").into())); - assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into())); - assert!(keys.contains(&TableLock::Write(1).into())); - assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into())); - } -} diff --git a/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs b/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs deleted file mode 100644 index c83ddb6c6c..0000000000 --- a/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_meta::instruction::CacheIdent; -use common_procedure::error::ToJsonSnafu; -use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, - Result as ProcedureResult, Status, -}; -use common_telemetry::info; -use snafu::{ensure, ResultExt}; -use store_api::storage::RegionId; - -use super::remove::RemoveFollower; -use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context}; -use crate::error::{self, Result}; -use crate::metrics; - -/// The procedure to remove a region follower. -pub struct RemoveRegionFollowerProcedure { - pub data: AlterRegionFollowerData, - pub context: Context, -} - -impl RemoveRegionFollowerProcedure { - pub const TYPE_NAME: &'static str = "metasrv-procedure::RemoveRegionFollower"; - - pub fn new( - catalog: String, - schema: String, - region_id: RegionId, - peer_id: u64, - context: Context, - ) -> Self { - Self { - data: AlterRegionFollowerData { - catalog, - schema, - region_id, - peer_id, - peer: None, - datanode_table_value: None, - table_route: None, - state: AlterRegionFollowerState::Prepare, - }, - context, - } - } - - pub fn from_json(json: &str, context: Context) -> ProcedureResult { - let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap(); - Ok(Self { data, context }) - } - - pub async fn on_prepare(&mut self) -> Result { - // loads the datanode peer and check peer is alive - self.data.peer = self.data.load_datanode_peer(&self.context).await?; - - // loads the table route of the region - self.data.table_route = self.data.load_table_route(&self.context).await?; - - // loads the table route of the region - self.data.table_route = self.data.load_table_route(&self.context).await?; - let region_leader_datanode_id = { - let table_route = self.data.physical_table_route().unwrap(); - table_route - .region_routes - .iter() - .find(|region_route| region_route.region.id == self.data.region_id) - .map(|region_route| region_route.leader_peer.as_ref().unwrap().id) - .unwrap_or_default() - }; - - // loads the datanode table value - self.data.datanode_table_value = self - .data - .load_datanode_table_value(&self.context, region_leader_datanode_id) - .await?; - - let table_route = self.data.physical_table_route().unwrap(); - - // check if the destination peer has this region - for region_route in &table_route.region_routes { - if region_route.region.id != self.data.region_id { - continue; - } - ensure!( - !region_route - .follower_peers - .iter() - .any(|peer| peer.id == self.data.peer_id), - error::RegionFollowerNotExistsSnafu { - region_id: self.data.region_id, - peer_id: self.data.peer_id, - } - ); - } - - info!( - "Remove region({}) follower procedure is preparing, peer: {:?}", - self.data.region_id, - self.data.datanode_peer() - ); - self.data.state = AlterRegionFollowerState::SubmitRequest; - - Ok(Status::executing(true)) - } - - pub async fn on_submit_request(&mut self) -> Result { - let region_id = self.data.region_id; - // Safety: we have already set the peer in `on_prepare``. - let peer = self.data.peer.clone().unwrap(); - let remove_follower = RemoveFollower::new(region_id, peer); - let instruction = remove_follower - .build_close_region_instruction(self.data.region_info().unwrap()) - .await?; - remove_follower - .send_close_region_instruction(&self.context, instruction) - .await?; - self.data.state = AlterRegionFollowerState::UpdateMetadata; - - Ok(Status::executing(true)) - } - - pub async fn on_update_metadata(&mut self) -> Result { - // Safety: we have already load the table route in `on_prepare``. - let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap(); - - let mut new_region_routes = phy_table_route.region_routes.clone(); - for region_route in &mut new_region_routes { - if region_route.region.id != self.data.region_id { - continue; - } - // remove the follower peer from the region route - region_route - .follower_peers - .retain(|peer| peer.id != self.data.peer_id); - } - - // Safety: we have already load the region info in `on_prepare`. - let region_info = self.data.region_info().unwrap(); - let new_region_options = region_info.region_options.clone(); - let new_region_wal_options = region_info.region_wal_options.clone(); - - self.context - .table_metadata_manager - .update_table_route( - self.data.region_id.table_id(), - region_info, - current_table_route_value, - new_region_routes, - &new_region_options, - &new_region_wal_options, - ) - .await - .context(error::TableMetadataManagerSnafu)?; - self.data.state = AlterRegionFollowerState::InvalidateTableCache; - - Ok(Status::executing(true)) - } - - pub async fn on_broadcast(&mut self) -> Result { - let table_id = self.data.region_id.table_id(); - // ignore the result - let ctx = common_meta::cache_invalidator::Context::default(); - let _ = self - .context - .cache_invalidator - .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) - .await; - Ok(Status::done()) - } -} - -#[async_trait::async_trait] -impl Procedure for RemoveRegionFollowerProcedure { - fn type_name(&self) -> &str { - Self::TYPE_NAME - } - - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.data.state; - - let _timer = metrics::METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE - .with_label_values(&[state.as_ref()]) - .start_timer(); - - match state { - AlterRegionFollowerState::Prepare => self.on_prepare().await, - AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await, - AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await, - AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await, - } - .map_err(|e| { - if e.is_retryable() { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - }) - } - - fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.data).context(ToJsonSnafu) - } - - fn lock_key(&self) -> LockKey { - LockKey::new(self.data.lock_key()) - } -} - -#[cfg(test)] -mod tests { - use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; - - use super::*; - use crate::procedure::region_follower::test_util::TestingEnv; - - #[tokio::test] - async fn test_lock_key() { - let env = TestingEnv::new(); - let context = env.new_context(); - - let procedure = RemoveRegionFollowerProcedure::new( - "test_catalog".to_string(), - "test_schema".to_string(), - RegionId::new(1, 1), - 1, - context, - ); - - let key = procedure.lock_key(); - let keys = key.keys_to_lock().cloned().collect::>(); - - assert_eq!(keys.len(), 4); - assert!(keys.contains(&CatalogLock::Read("test_catalog").into())); - assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into())); - assert!(keys.contains(&TableLock::Write(1).into())); - assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into())); - } -}