From 1d87bd2d4347a729456026d91a9174ed73e2f00d Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 11 Mar 2025 16:44:50 +0800 Subject: [PATCH] feat: alter region follower (#5676) * feat: add region follower manager * feat: add region procudure * refactor: make add, remove follower procedure look nice * feat: add region follower procedure * chore: undo some chane, possibly made by AI * feat: on prepare cheking * feat: on update metadata * feat: on broadcast * chore: unit test * feat: add remove follower operation * feat: add or remove region follower procedure * chore: ut * chore: rename * chore: by comment * chore: by comment --------- Co-authored-by: jeremy --- Cargo.lock | 1 + src/common/meta/src/rpc/procedure.rs | 18 ++ src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/error.rs | 39 +++ src/meta-srv/src/metasrv/builder.rs | 16 ++ src/meta-srv/src/metrics.rs | 7 +- src/meta-srv/src/procedure.rs | 3 + src/meta-srv/src/procedure/region_follower.rs | 229 ++++++++++++++++ .../region_follower/add_region_follower.rs | 247 +++++++++++++++++ .../src/procedure/region_follower/create.rs | 252 ++++++++++++++++++ .../src/procedure/region_follower/manager.rs | 193 ++++++++++++++ .../src/procedure/region_follower/remove.rs | 234 ++++++++++++++++ .../region_follower/remove_region_follower.rs | 233 ++++++++++++++++ .../procedure/region_follower/test_util.rs | 97 +++++++ .../src/procedure/region_migration.rs | 3 + .../close_downgraded_region.rs | 2 +- .../downgrade_leader_region.rs | 9 +- .../region_migration/open_candidate_region.rs | 9 +- .../procedure/region_migration/test_util.rs | 148 +--------- .../upgrade_candidate_region.rs | 9 +- src/meta-srv/src/procedure/test_util.rs | 158 +++++++++++ 21 files changed, 1751 insertions(+), 157 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_follower.rs create mode 100644 src/meta-srv/src/procedure/region_follower/add_region_follower.rs create mode 100644 src/meta-srv/src/procedure/region_follower/create.rs create mode 100644 src/meta-srv/src/procedure/region_follower/manager.rs create mode 100644 src/meta-srv/src/procedure/region_follower/remove.rs create mode 100644 src/meta-srv/src/procedure/region_follower/remove_region_follower.rs create mode 100644 src/meta-srv/src/procedure/region_follower/test_util.rs create mode 100644 src/meta-srv/src/procedure/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 0d08e64ed6..6f5ecef85e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6717,6 +6717,7 @@ dependencies = [ "session", "snafu 0.8.5", "store-api", + "strum 0.25.0", "table", "tokio", "tokio-postgres", diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 2e25a4aa5d..7dd51593d1 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -34,6 +34,24 @@ pub struct MigrateRegionRequest { pub timeout: Duration, } +/// A request to add region follower. +#[derive(Debug, Clone)] +pub struct AddRegionFollowerRequest { + /// The region id to add follower. + pub region_id: u64, + /// The peer id to add follower. + pub peer_id: u64, +} + +/// A request to remove region follower. +#[derive(Debug, Clone)] +pub struct RemoveRegionFollowerRequest { + /// The region id to remove follower. + pub region_id: u64, + /// The peer id to remove follower. + pub peer_id: u64, +} + /// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`]. pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result { ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| { diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 4b7f2137d6..d864fa5c7d 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -61,6 +61,7 @@ serde_json.workspace = true servers.workspace = true snafu.workspace = true store-api.workspace = true +strum.workspace = true table.workspace = true tokio.workspace = true tokio-postgres = { workspace = true, optional = true, features = ["with-chrono-0_4"] } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index ab86665c01..d309cb164c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -749,6 +749,41 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, + + #[snafu(display("Logical table cannot add follower: {table_id}"))] + LogicalTableCannotAddFollower { + table_id: TableId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "A region follower cannot be placed on the same node as the leader: {region_id}, {peer_id}" + ))] + RegionFollowerLeaderConflict { + region_id: RegionId, + peer_id: u64, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Multiple region followers cannot be placed on the same node: {region_id}, {peer_id}" + ))] + MultipleRegionFollowersOnSameNode { + region_id: RegionId, + peer_id: u64, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Region follower not exists: {region_id}, {peer_id}"))] + RegionFollowerNotExists { + region_id: RegionId, + peer_id: u64, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -818,6 +853,10 @@ impl ErrorExt for Error { | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } + | Error::LogicalTableCannotAddFollower { .. } + | Error::RegionFollowerLeaderConflict { .. } + | Error::MultipleRegionFollowersOnSameNode { .. } + | Error::RegionFollowerNotExists { .. } | Error::HandlerNotFound { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index cd01d14883..dc275e32ae 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -55,6 +55,8 @@ use crate::lease::MetaPeerLookupService; use crate::metasrv::{ ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; +use crate::procedure::region_follower::manager::RegionFollowerManager; +use crate::procedure::region_follower::Context as ArfContext; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::region::supervisor::{ @@ -292,6 +294,7 @@ impl MetasrvBuilder { (Arc::new(NoopRegionFailureDetectorControl) as _, None as _) }; + // region migration manager let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( @@ -342,6 +345,19 @@ impl MetasrvBuilder { .context(error::InitDdlManagerSnafu)?, ); + // alter region follower manager + let region_follower_manager = Arc::new(RegionFollowerManager::new( + procedure_manager.clone(), + ArfContext { + table_metadata_manager: table_metadata_manager.clone(), + mailbox: mailbox.clone(), + server_addr: options.server_addr.clone(), + cache_invalidator: cache_invalidator.clone(), + meta_peer_client: meta_peer_client.clone(), + }, + )); + region_follower_manager.try_start()?; + let handler_group_builder = match handler_group_builder { Some(handler_group_builder) => handler_group_builder, None => { diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index 1ed34bcd3b..9160aa1e1d 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -60,5 +60,10 @@ lazy_static! { /// The migration fail counter. pub static ref METRIC_META_REGION_MIGRATION_FAIL: IntCounter = register_int_counter!("greptime_meta_region_migration_fail", "meta region migration fail").unwrap(); - + /// The add region follower execute histogram. + pub static ref METRIC_META_ADD_REGION_FOLLOWER_EXECUTE: HistogramVec = + register_histogram_vec!("greptime_meta_add_region_follower_execute", "meta add region follower execute", &["state"]).unwrap(); + /// The remove region follower execute histogram. + pub static ref METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE: HistogramVec = + register_histogram_vec!("greptime_meta_remove_region_follower_execute", "meta remove region follower execute", &["state"]).unwrap(); } diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 8e696723e8..9185fd05ab 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -18,8 +18,11 @@ use common_meta::leadership_notifier::LeadershipChangeListener; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; +pub mod region_follower; pub mod region_migration; #[cfg(test)] +mod test_util; +#[cfg(test)] mod tests; pub mod utils; diff --git a/src/meta-srv/src/procedure/region_follower.rs b/src/meta-srv/src/procedure/region_follower.rs new file mode 100644 index 0000000000..c0832a6686 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod manager; + +pub mod add_region_follower; +mod create; +mod remove; +pub mod remove_region_follower; +#[cfg(test)] +mod test_util; + +use common_error::ext::BoxedError; +use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::distributed_time_constants; +use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo}; +use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteValue}; +use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; +use common_meta::peer::Peer; +use common_procedure::StringKey; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionId; +use strum::AsRefStr; + +use crate::cluster::MetaPeerClientRef; +use crate::error::{self, Result}; +use crate::lease::lookup_datanode_peer; +use crate::service::mailbox::MailboxRef; + +#[derive(Clone)] +/// The context of add/remove region follower procedure. +pub struct Context { + /// The table metadata manager. + pub table_metadata_manager: TableMetadataManagerRef, + /// The mailbox. + pub mailbox: MailboxRef, + /// The metasrv's address. + pub server_addr: String, + /// The cache invalidator. + pub cache_invalidator: CacheInvalidatorRef, + /// The meta peer client. + pub meta_peer_client: MetaPeerClientRef, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AlterRegionFollowerData { + /// The catalog name. + pub(crate) catalog: String, + /// The schema name. + pub(crate) schema: String, + /// The region id. + pub(crate) region_id: RegionId, + /// The peer id of the datanode to add region follower. + pub(crate) peer_id: u64, + /// The peer of the datanode to add region follower. + pub(crate) peer: Option, + /// The datanode table value of the region. + pub(crate) datanode_table_value: Option, + /// The physical table route of the region. + pub(crate) table_route: Option<( + DeserializedValueWithBytes, + PhysicalTableRouteValue, + )>, + /// The state. + pub(crate) state: AlterRegionFollowerState, +} + +impl AlterRegionFollowerData { + pub fn lock_key(&self) -> Vec { + let region_id = self.region_id; + let lock_key = vec![ + CatalogLock::Read(&self.catalog).into(), + SchemaLock::read(&self.catalog, &self.schema).into(), + // The optimistic updating of table route is not working very well, + // so we need to use the write lock here. + TableLock::Write(region_id.table_id()).into(), + RegionLock::Write(region_id).into(), + ]; + + lock_key + } + + pub(crate) fn datanode_peer(&self) -> Option<&Peer> { + self.peer.as_ref() + } + + pub(crate) fn physical_table_route(&self) -> Option<&PhysicalTableRouteValue> { + self.table_route + .as_ref() + .map(|(_, table_route)| table_route) + } + + /// Returns the region info of the region. + pub(crate) fn region_info(&self) -> Option { + self.datanode_table_value + .as_ref() + .map(|datanode_table_value| datanode_table_value.region_info.clone()) + } + + /// Loads the datanode peer. + pub(crate) async fn load_datanode_peer(&self, ctx: &Context) -> Result> { + let peer = lookup_datanode_peer( + self.peer_id, + &ctx.meta_peer_client, + distributed_time_constants::DATANODE_LEASE_SECS, + ) + .await? + .context(error::PeerUnavailableSnafu { + peer_id: self.peer_id, + })?; + + Ok(Some(peer)) + } + + /// Loads the datanode table value of the region. + pub(crate) async fn load_datanode_table_value( + &self, + ctx: &Context, + ) -> Result> { + let table_id = self.region_id.table_id(); + let datanode_id = self.peer_id; + let datanode_table_key = DatanodeTableKey { + datanode_id, + table_id, + }; + + let datanode_table_value = ctx + .table_metadata_manager + .datanode_table_manager() + .get(&datanode_table_key) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"), + })? + .context(error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id, + })?; + + Ok(Some(datanode_table_value)) + } + + /// Loads the table route of the region, returns the physical table id. + pub(crate) async fn load_table_route( + &self, + ctx: &Context, + ) -> Result< + Option<( + DeserializedValueWithBytes, + PhysicalTableRouteValue, + )>, + > { + let table_id = self.region_id.table_id(); + let raw_table_route = ctx + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get TableRoute: {table_id}"), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + let table_route = raw_table_route.clone().into_inner(); + + ensure!( + table_route.is_physical(), + error::LogicalTableCannotAddFollowerSnafu { table_id } + ); + + Ok(Some(( + raw_table_route, + table_route.into_physical_table_route(), + ))) + } +} + +#[derive(Debug, Serialize, Deserialize, AsRefStr)] +pub enum AlterRegionFollowerState { + /// Prepares to alter region follower. + Prepare, + /// Sends alter region follower request to Datanode. + SubmitRequest, + /// Updates table metadata. + UpdateMetadata, + /// Broadcasts the invalidate table route cache message. + InvalidateTableCache, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_data_serialization() { + let data = AlterRegionFollowerData { + catalog: "test_catalog".to_string(), + schema: "test_schema".to_string(), + region_id: RegionId::new(1, 1), + peer_id: 1, + peer: None, + datanode_table_value: None, + table_route: None, + state: AlterRegionFollowerState::Prepare, + }; + + assert_eq!(data.region_id.as_u64(), 4294967297); + let serialized = serde_json::to_string(&data).unwrap(); + let expected = r#"{"catalog":"test_catalog","schema":"test_schema","region_id":4294967297,"peer_id":1,"peer":null,"datanode_table_value":null,"table_route":null,"state":"Prepare"}"#; + assert_eq!(expected, serialized); + } +} diff --git a/src/meta-srv/src/procedure/region_follower/add_region_follower.rs b/src/meta-srv/src/procedure/region_follower/add_region_follower.rs new file mode 100644 index 0000000000..dcd5bb0956 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/add_region_follower.rs @@ -0,0 +1,247 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::CacheIdent; +use common_procedure::error::ToJsonSnafu; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use super::create::CreateFollower; +use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context}; +use crate::error::{self, Result}; +use crate::metrics; + +/// The procedure to add a region follower. +pub struct AddRegionFollowerProcedure { + pub data: AlterRegionFollowerData, + pub context: Context, +} + +impl AddRegionFollowerProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::AddRegionFollower"; + + pub fn new( + catalog: String, + schema: String, + region_id: RegionId, + peer_id: u64, + context: Context, + ) -> Self { + Self { + data: AlterRegionFollowerData { + catalog, + schema, + region_id, + peer_id, + peer: None, + datanode_table_value: None, + table_route: None, + state: AlterRegionFollowerState::Prepare, + }, + context, + } + } + + pub fn from_json(json: &str, context: Context) -> ProcedureResult { + let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap(); + Ok(Self { data, context }) + } + + pub async fn on_prepare(&mut self) -> Result { + // loads the datanode peer and check peer is alive + self.data.peer = self.data.load_datanode_peer(&self.context).await?; + + // loads the datanode table value + self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?; + + // loads the table route of the region + self.data.table_route = self.data.load_table_route(&self.context).await?; + let table_route = self.data.physical_table_route().unwrap(); + let datanode_peer = self.data.datanode_peer().unwrap(); + + // check if the destination peer is already a leader/follower of the region + for region_route in &table_route.region_routes { + if region_route.region.id != self.data.region_id { + continue; + } + let Some(leader_peer) = ®ion_route.leader_peer else { + continue; + }; + + // check if the destination peer is already a leader of the region + if leader_peer.id == datanode_peer.id { + return error::RegionFollowerLeaderConflictSnafu { + region_id: self.data.region_id, + peer_id: datanode_peer.id, + } + .fail(); + } + + // check if the destination peer is already a follower of the region + if region_route + .follower_peers + .iter() + .any(|peer| peer.id == datanode_peer.id) + { + return error::MultipleRegionFollowersOnSameNodeSnafu { + region_id: self.data.region_id, + peer_id: datanode_peer.id, + } + .fail(); + } + } + + info!( + "Add region({}) follower procedure is preparing, peer: {datanode_peer:?}", + self.data.region_id + ); + + Ok(Status::executing(true)) + } + + pub async fn on_submit_request(&mut self) -> Result { + let region_id = self.data.region_id; + // Safety: we have already set the peer in `on_prepare``. + let peer = self.data.peer.clone().unwrap(); + let create_follower = CreateFollower::new(region_id, peer); + let instruction = create_follower + .build_open_region_instruction(self.data.region_info().unwrap()) + .await?; + create_follower + .send_open_region_instruction(&self.context, instruction) + .await?; + + Ok(Status::executing(true)) + } + + pub async fn on_update_metadata(&mut self) -> Result { + // Safety: we have already load the table route in `on_prepare``. + let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap(); + + let mut new_region_routes = phy_table_route.region_routes.clone(); + for region_route in &mut new_region_routes { + if region_route.region.id != self.data.region_id { + continue; + } + region_route + .follower_peers + .push(self.data.peer.clone().unwrap()); + } + + // Safety: we have already load the region info in `on_prepare`. + let region_info = self.data.region_info().unwrap(); + let new_region_options = region_info.region_options.clone(); + let new_region_wal_options = region_info.region_wal_options.clone(); + + self.context + .table_metadata_manager + .update_table_route( + self.data.region_id.table_id(), + region_info, + current_table_route_value, + new_region_routes, + &new_region_options, + &new_region_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + Ok(Status::executing(true)) + } + + pub async fn on_broadcast(&mut self) -> Result { + let table_id = self.data.region_id.table_id(); + // ignore the result + let ctx = common_meta::cache_invalidator::Context::default(); + let _ = self + .context + .cache_invalidator + .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .await; + Ok(Status::executing(true)) + } +} + +#[async_trait::async_trait] +impl Procedure for AddRegionFollowerProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_ADD_REGION_FOLLOWER_EXECUTE + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + AlterRegionFollowerState::Prepare => self.on_prepare().await, + AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await, + AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await, + AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await, + } + .map_err(|e| { + if e.is_retryable() { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.data.lock_key()) + } +} + +#[cfg(test)] +mod tests { + use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; + + use super::*; + use crate::procedure::region_follower::test_util::TestingEnv; + + #[tokio::test] + async fn test_lock_key() { + let env = TestingEnv::new(); + let context = env.new_context(); + + let procedure = AddRegionFollowerProcedure::new( + "test_catalog".to_string(), + "test_schema".to_string(), + RegionId::new(1, 1), + 1, + context, + ); + + let key = procedure.lock_key(); + let keys = key.keys_to_lock().cloned().collect::>(); + + assert_eq!(keys.len(), 4); + assert!(keys.contains(&CatalogLock::Read("test_catalog").into())); + assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into())); + assert!(keys.contains(&TableLock::Write(1).into())); + assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into())); + } +} diff --git a/src/meta-srv/src/procedure/region_follower/create.rs b/src/meta-srv/src/procedure/region_follower/create.rs new file mode 100644 index 0000000000..ee281614a8 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/create.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::peer::Peer; +use common_meta::RegionIdent; +use common_telemetry::info; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use super::Context; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::service::mailbox::Channel; + +/// Uses lease time of a region as the timeout of opening a follower region. +const OPEN_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); + +pub(crate) struct CreateFollower { + region_id: RegionId, + // The peer of the datanode to add region follower. + peer: Peer, +} + +impl CreateFollower { + pub fn new(region_id: RegionId, peer: Peer) -> Self { + Self { region_id, peer } + } + + /// Builds the open region instruction for the region follower. + pub(crate) async fn build_open_region_instruction( + &self, + region_info: RegionInfo, + ) -> Result { + let datanode_id = self.peer.id; + let table_id = self.region_id.table_id(); + let region_number = self.region_id.region_number(); + + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, + engine, + } = region_info; + + let region_ident = RegionIdent { + datanode_id, + table_id, + region_number, + engine, + }; + + let open_instruction = Instruction::OpenRegion(OpenRegion::new( + region_ident, + ®ion_storage_path, + region_options, + region_wal_options, + true, + )); + + Ok(open_instruction) + } + + /// Sends the open region instruction to the datanode. + pub(crate) async fn send_open_region_instruction( + &self, + ctx: &Context, + instruction: Instruction, + ) -> Result<()> { + // TODO(jeremy): register the opening_region_keeper + let msg = MailboxMessage::json_message( + &format!("Open a follower region: {}", self.region_id), + &format!("Metasrv@{}", ctx.server_addr), + &format!("Datanode-{}@{}", self.peer.id, self.peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let ch = Channel::Datanode(self.peer.id); + let now = Instant::now(); + let receiver = ctx + .mailbox + .send(&ch, msg, OPEN_REGION_FOLLOWER_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received open region follower reply: {:?}, region: {}, elapsed: {:?}", + reply, + self.region_id, + now.elapsed() + ); + let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect open region follower reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::RetryLaterSnafu { + reason: format!( + "Region {} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}", + self.region_id, + &self.peer, + now.elapsed() + ), + } + .fail() + } + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for open region follower {} on datanode {:?}, elapsed: {:?}", + self.region_id, + &self.peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::DatanodeId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_follower::test_util::TestingEnv; + use crate::procedure::test_util::{new_close_region_reply, send_mock_reply}; + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let env = TestingEnv::new(); + let ctx = env.new_context(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + let create_follower = CreateFollower::new(region_id, peer.clone()); + let instruction = mock_open_region_instruction(peer.id, region_id); + let err = create_follower + .send_open_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let mut env = TestingEnv::new(); + let ctx = env.new_context(); + let mailbox_ctx = env.mailbox_context_mut(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) + .await; + + // Sends an timeout error. + send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); + + let create_follower = CreateFollower::new(region_id, peer.clone()); + let instruction = mock_open_region_instruction(peer.id, region_id); + let err = create_follower + .send_open_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let mut env = TestingEnv::new(); + let ctx = env.new_context(); + let mailbox_ctx = env.mailbox_context_mut(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) + .await; + + // Sends an timeout error. + send_mock_reply(mailbox, rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) + }); + + let create_follower = CreateFollower::new(region_id, peer.clone()); + let instruction = mock_open_region_instruction(peer.id, region_id); + let err = create_follower + .send_open_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + fn mock_open_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { + Instruction::OpenRegion(OpenRegion { + region_ident: RegionIdent { + datanode_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: "mito2".to_string(), + }, + region_storage_path: "/tmp".to_string(), + region_options: Default::default(), + region_wal_options: Default::default(), + skip_wal_replay: true, + }) + } +} diff --git a/src/meta-srv/src/procedure/region_follower/manager.rs b/src/meta-srv/src/procedure/region_follower/manager.rs new file mode 100644 index 0000000000..40f7da8efd --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/manager.rs @@ -0,0 +1,193 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::rpc::procedure::{AddRegionFollowerRequest, RemoveRegionFollowerRequest}; +use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; +use common_telemetry::info; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; +use table::table_name::TableName; + +use super::remove_region_follower::RemoveRegionFollowerProcedure; +use crate::error::{self, Result}; +use crate::procedure::region_follower::add_region_follower::AddRegionFollowerProcedure; +use crate::procedure::region_follower::Context; + +pub struct RegionFollowerManager { + procedure_manager: ProcedureManagerRef, + default_context: Context, +} + +impl RegionFollowerManager { + pub fn new(procedure_manager: ProcedureManagerRef, default_context: Context) -> Self { + Self { + procedure_manager, + default_context, + } + } + + pub fn new_context(&self) -> Context { + self.default_context.clone() + } + + pub(crate) fn try_start(&self) -> Result<()> { + // register add region follower procedure + let context = self.new_context(); + let type_name = AddRegionFollowerProcedure::TYPE_NAME; + self.procedure_manager + .register_loader( + type_name, + Box::new(move |json| { + let context = context.clone(); + AddRegionFollowerProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { type_name })?; + + // register remove region follower procedure + let context = self.new_context(); + let type_name = RemoveRegionFollowerProcedure::TYPE_NAME; + self.procedure_manager + .register_loader( + type_name, + Box::new(move |json| { + let context = context.clone(); + RemoveRegionFollowerProcedure::from_json(json, context) + .map(|p| Box::new(p) as _) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { type_name })?; + Ok(()) + } + + pub async fn submit_add_follower_procedure( + &self, + req: AddRegionFollowerRequest, + ) -> Result<(ProcedureId, Option)> { + let AddRegionFollowerRequest { region_id, peer_id } = req; + let region_id = RegionId::from_u64(region_id); + let table_id = region_id.table_id(); + let ctx = self.new_context(); + + // get the table info + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner(); + + let TableName { + catalog_name, + schema_name, + .. + } = table_info.table_name(); + + let procedure = + AddRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + info!("Starting add region follower procedure {procedure_id} for {req:?}"); + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; + let output = watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu)?; + + Ok((procedure_id, output)) + } + + pub async fn submit_remove_follower_procedure( + &self, + req: RemoveRegionFollowerRequest, + ) -> Result<(ProcedureId, Option)> { + let RemoveRegionFollowerRequest { region_id, peer_id } = req; + let region_id = RegionId::from_u64(region_id); + let table_id = region_id.table_id(); + let ctx = self.new_context(); + + // get the table info + let table_info = ctx + .table_metadata_manager + .table_info_manager() + .get(table_id) + .await + .context(error::TableMetadataManagerSnafu)? + .context(error::TableInfoNotFoundSnafu { table_id })? + .into_inner(); + + let TableName { + catalog_name, + schema_name, + .. + } = table_info.table_name(); + + let procedure = + RemoveRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + info!("Starting remove region follower procedure {procedure_id} for {req:?}"); + let mut watcher = self + .procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; + let output = watcher::wait(&mut watcher) + .await + .context(error::WaitProcedureSnafu)?; + + Ok((procedure_id, output)) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::procedure::region_follower::test_util::TestingEnv; + + #[tokio::test] + async fn test_submit_procedure_table_not_found() { + let env = TestingEnv::new(); + let ctx = env.new_context(); + let region_follower_manager = RegionFollowerManager::new(env.procedure_manager(), ctx); + let req = AddRegionFollowerRequest { + region_id: 1, + peer_id: 2, + }; + let err = region_follower_manager + .submit_add_follower_procedure(req) + .await + .unwrap_err(); + assert_matches!(err, error::Error::TableInfoNotFound { .. }); + + let req = RemoveRegionFollowerRequest { + region_id: 1, + peer_id: 2, + }; + let err = region_follower_manager + .submit_remove_follower_procedure(req) + .await + .unwrap_err(); + assert_matches!(err, error::Error::TableInfoNotFound { .. }); + } +} diff --git a/src/meta-srv/src/procedure/region_follower/remove.rs b/src/meta-srv/src/procedure/region_follower/remove.rs new file mode 100644 index 0000000000..c066812931 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/remove.rs @@ -0,0 +1,234 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::key::datanode_table::RegionInfo; +use common_meta::peer::Peer; +use common_meta::RegionIdent; +use common_telemetry::info; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use super::Context; +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::service::mailbox::Channel; + +/// Uses lease time of a region as the timeout of closing a follower region. +const CLOSE_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); + +pub(crate) struct RemoveFollower { + region_id: RegionId, + // The peer of the datanode to add region follower. + peer: Peer, +} + +impl RemoveFollower { + pub fn new(region_id: RegionId, peer: Peer) -> Self { + Self { region_id, peer } + } + + /// Builds the close region instruction for the region follower. + pub(crate) async fn build_close_region_instruction( + &self, + region_info: RegionInfo, + ) -> Result { + let datanode_id = self.peer.id; + let table_id = self.region_id.table_id(); + let region_number = self.region_id.region_number(); + + let RegionInfo { engine, .. } = region_info; + + let region_ident = RegionIdent { + datanode_id, + table_id, + region_number, + engine, + }; + + let close_instruction = Instruction::CloseRegion(region_ident); + + Ok(close_instruction) + } + + /// Sends the close region instruction to the datanode. + pub(crate) async fn send_close_region_instruction( + &self, + ctx: &Context, + instruction: Instruction, + ) -> Result<()> { + let msg = MailboxMessage::json_message( + &format!("Close a follower region: {}", self.region_id), + &format!("Metasrv@{}", ctx.server_addr), + &format!("Datanode-{}@{}", self.peer.id, self.peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let ch = Channel::Datanode(self.peer.id); + let now = Instant::now(); + let receiver = ctx + .mailbox + .send(&ch, msg, CLOSE_REGION_FOLLOWER_TIMEOUT) + .await?; + + match receiver.await? { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received close region follower reply: {:?}, region: {}, elapsed: {:?}", + reply, + self.region_id, + now.elapsed() + ); + let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect close region follower reply", + } + .fail(); + }; + + if result { + Ok(()) + } else { + error::RetryLaterSnafu { + reason: format!( + "Region {} is not closed by datanode {:?}, error: {error:?}, elapsed: {:?}", + self.region_id, + &self.peer, + now.elapsed() + ), + } + .fail() + } + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for close region follower {} on datanode {:?}, elapsed: {:?}", + self.region_id, + &self.peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::DatanodeId; + + use super::*; + use crate::error::Error; + use crate::procedure::region_follower::test_util::TestingEnv; + use crate::procedure::test_util::{new_open_region_reply, send_mock_reply}; + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let env = TestingEnv::new(); + let ctx = env.new_context(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + let remove_follower = RemoveFollower::new(region_id, peer.clone()); + let instruction = mock_close_region_instruction(peer.id, region_id); + let err = remove_follower + .send_close_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::PusherNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let mut env = TestingEnv::new(); + let ctx = env.new_context(); + let mailbox_ctx = env.mailbox_context_mut(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) + .await; + + // Sends an timeout error. + send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, false, None))); + + let remove_follower = RemoveFollower::new(region_id, peer.clone()); + let instruction = mock_close_region_instruction(peer.id, region_id); + let err = remove_follower + .send_close_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_instruction_exceeded_deadline() { + let mut env = TestingEnv::new(); + let ctx = env.new_context(); + let mailbox_ctx = env.mailbox_context_mut(); + let mailbox = mailbox_ctx.mailbox().clone(); + + let region_id = RegionId::new(1, 1); + let peer = Peer::new(1, "127.0.0.1:8080"); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) + .await; + + // Sends an timeout error. + send_mock_reply(mailbox, rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) + }); + + let remove_follower = RemoveFollower::new(region_id, peer.clone()); + let instruction = mock_close_region_instruction(peer.id, region_id); + let err = remove_follower + .send_close_region_instruction(&ctx, instruction) + .await + .unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + fn mock_close_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { + Instruction::CloseRegion(RegionIdent { + datanode_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: "mito2".to_string(), + }) + } +} diff --git a/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs b/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs new file mode 100644 index 0000000000..97e9e19776 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/remove_region_follower.rs @@ -0,0 +1,233 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::CacheIdent; +use common_procedure::error::ToJsonSnafu; +use common_procedure::{ + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use snafu::{ensure, ResultExt}; +use store_api::storage::RegionId; + +use super::remove::RemoveFollower; +use super::{AlterRegionFollowerData, AlterRegionFollowerState, Context}; +use crate::error::{self, Result}; +use crate::metrics; + +/// The procedure to remove a region follower. +pub struct RemoveRegionFollowerProcedure { + pub data: AlterRegionFollowerData, + pub context: Context, +} + +impl RemoveRegionFollowerProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::RemoveRegionFollower"; + + pub fn new( + catalog: String, + schema: String, + region_id: RegionId, + peer_id: u64, + context: Context, + ) -> Self { + Self { + data: AlterRegionFollowerData { + catalog, + schema, + region_id, + peer_id, + peer: None, + datanode_table_value: None, + table_route: None, + state: AlterRegionFollowerState::Prepare, + }, + context, + } + } + + pub fn from_json(json: &str, context: Context) -> ProcedureResult { + let data: AlterRegionFollowerData = serde_json::from_str(json).unwrap(); + Ok(Self { data, context }) + } + + pub async fn on_prepare(&mut self) -> Result { + // loads the datanode peer and check peer is alive + self.data.peer = self.data.load_datanode_peer(&self.context).await?; + + // loads the datanode table value + self.data.datanode_table_value = self.data.load_datanode_table_value(&self.context).await?; + + // loads the table route of the region + self.data.table_route = self.data.load_table_route(&self.context).await?; + let table_route = self.data.physical_table_route().unwrap(); + + // check if the destination peer has this region + for region_route in &table_route.region_routes { + if region_route.region.id != self.data.region_id { + continue; + } + ensure!( + !region_route + .follower_peers + .iter() + .any(|peer| peer.id == self.data.peer_id), + error::RegionFollowerNotExistsSnafu { + region_id: self.data.region_id, + peer_id: self.data.peer_id, + } + ); + } + + info!( + "Remove region({}) follower procedure is preparing, peer: {:?}", + self.data.region_id, + self.data.datanode_peer() + ); + + Ok(Status::executing(true)) + } + + pub async fn on_submit_request(&mut self) -> Result { + let region_id = self.data.region_id; + // Safety: we have already set the peer in `on_prepare``. + let peer = self.data.peer.clone().unwrap(); + let remove_follower = RemoveFollower::new(region_id, peer); + let instruction = remove_follower + .build_close_region_instruction(self.data.region_info().unwrap()) + .await?; + remove_follower + .send_close_region_instruction(&self.context, instruction) + .await?; + + Ok(Status::executing(true)) + } + + pub async fn on_update_metadata(&mut self) -> Result { + // Safety: we have already load the table route in `on_prepare``. + let (current_table_route_value, phy_table_route) = self.data.table_route.as_ref().unwrap(); + + let mut new_region_routes = phy_table_route.region_routes.clone(); + for region_route in &mut new_region_routes { + if region_route.region.id != self.data.region_id { + continue; + } + // remove the follower peer from the region route + region_route + .follower_peers + .retain(|peer| peer.id != self.data.peer_id); + } + + // Safety: we have already load the region info in `on_prepare`. + let region_info = self.data.region_info().unwrap(); + let new_region_options = region_info.region_options.clone(); + let new_region_wal_options = region_info.region_wal_options.clone(); + + self.context + .table_metadata_manager + .update_table_route( + self.data.region_id.table_id(), + region_info, + current_table_route_value, + new_region_routes, + &new_region_options, + &new_region_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu)?; + + Ok(Status::executing(true)) + } + + pub async fn on_broadcast(&mut self) -> Result { + let table_id = self.data.region_id.table_id(); + // ignore the result + let ctx = common_meta::cache_invalidator::Context::default(); + let _ = self + .context + .cache_invalidator + .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .await; + Ok(Status::executing(true)) + } +} + +#[async_trait::async_trait] +impl Procedure for RemoveRegionFollowerProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + + let _timer = metrics::METRIC_META_REMOVE_REGION_FOLLOWER_EXECUTE + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + AlterRegionFollowerState::Prepare => self.on_prepare().await, + AlterRegionFollowerState::SubmitRequest => self.on_submit_request().await, + AlterRegionFollowerState::UpdateMetadata => self.on_update_metadata().await, + AlterRegionFollowerState::InvalidateTableCache => self.on_broadcast().await, + } + .map_err(|e| { + if e.is_retryable() { + ProcedureError::retry_later(e) + } else { + ProcedureError::external(e) + } + }) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(self.data.lock_key()) + } +} + +#[cfg(test)] +mod tests { + use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; + + use super::*; + use crate::procedure::region_follower::test_util::TestingEnv; + + #[tokio::test] + async fn test_lock_key() { + let env = TestingEnv::new(); + let context = env.new_context(); + + let procedure = RemoveRegionFollowerProcedure::new( + "test_catalog".to_string(), + "test_schema".to_string(), + RegionId::new(1, 1), + 1, + context, + ); + + let key = procedure.lock_key(); + let keys = key.keys_to_lock().cloned().collect::>(); + + assert_eq!(keys.len(), 4); + assert!(keys.contains(&CatalogLock::Read("test_catalog").into())); + assert!(keys.contains(&SchemaLock::read("test_catalog", "test_schema").into())); + assert!(keys.contains(&TableLock::Write(1).into())); + assert!(keys.contains(&RegionLock::Write(RegionId::new(1, 1)).into())); + } +} diff --git a/src/meta-srv/src/procedure/region_follower/test_util.rs b/src/meta-srv/src/procedure/region_follower/test_util.rs new file mode 100644 index 0000000000..69e7b42583 --- /dev/null +++ b/src/meta-srv/src/procedure/region_follower/test_util.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::ResettableKvBackendRef; +use common_meta::sequence::SequenceBuilder; +use common_meta::state_store::KvStateStore; +use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::ProcedureManagerRef; + +use super::Context; +use crate::cache_invalidator::MetasrvCacheInvalidator; +use crate::cluster::MetaPeerClientBuilder; +use crate::metasrv::MetasrvInfo; +use crate::procedure::test_util::MailboxContext; + +/// `TestingEnv` provides components during the tests. +pub struct TestingEnv { + table_metadata_manager: TableMetadataManagerRef, + mailbox_ctx: MailboxContext, + server_addr: String, + procedure_manager: ProcedureManagerRef, + in_memory: ResettableKvBackendRef, +} + +impl Default for TestingEnv { + fn default() -> Self { + Self::new() + } +} + +impl TestingEnv { + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let in_memory = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + let mailbox_sequence = + SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); + + let mailbox_ctx = MailboxContext::new(mailbox_sequence); + + let state_store = Arc::new(KvStateStore::new(kv_backend)); + let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + + Self { + table_metadata_manager, + mailbox_ctx, + server_addr: "localhost".to_string(), + procedure_manager, + in_memory, + } + } + + pub fn new_context(&self) -> Context { + Context { + table_metadata_manager: self.table_metadata_manager.clone(), + mailbox: self.mailbox_ctx.mailbox().clone(), + server_addr: self.server_addr.clone(), + cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( + self.mailbox_ctx.mailbox().clone(), + MetasrvInfo { + server_addr: self.server_addr.to_string(), + }, + )), + meta_peer_client: MetaPeerClientBuilder::default() + .election(None) + .in_memory(self.in_memory.clone()) + .build() + .map(Arc::new) + // Safety: all required fields set at initialization + .unwrap(), + } + } + + pub fn mailbox_context_mut(&mut self) -> &mut MailboxContext { + &mut self.mailbox_ctx + } + + pub fn procedure_manager(&self) -> ProcedureManagerRef { + self.procedure_manager.clone() + } +} diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 59f476ce68..f13939195b 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -579,6 +579,9 @@ mod tests { use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; use crate::procedure::region_migration::test_util::*; + use crate::procedure::test_util::{ + new_downgrade_region_reply, new_open_region_reply, new_upgrade_region_reply, + }; use crate::service::mailbox::Channel; fn new_persistent_context() -> PersistentContext { diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index ca451e6238..4e09e421d0 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -84,7 +84,7 @@ impl CloseDowngradedRegion { let downgrade_leader_datanode = &pc.from_peer; let msg = MailboxMessage::json_message( &format!("Close downgraded region: {}", region_id), - &format!("Meta@{}", ctx.server_addr()), + &format!("Metasrv@{}", ctx.server_addr()), &format!( "Datanode-{}@{}", downgrade_leader_datanode.id, downgrade_leader_datanode.addr diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index d1dfcd3e05..e93c787e17 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -150,7 +150,7 @@ impl DowngradeLeaderRegion { let leader = &ctx.persistent_ctx.from_peer; let msg = MailboxMessage::json_message( &format!("Downgrade leader region: {}", region_id), - &format!("Meta@{}", ctx.server_addr()), + &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", leader.id, leader.addr), common_time::util::current_time_millis(), &downgrade_instruction, @@ -282,10 +282,11 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::{ - new_close_region_reply, new_downgrade_region_reply, send_mock_reply, TestingEnv, - }; + use crate::procedure::region_migration::test_util::TestingEnv; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + use crate::procedure::test_util::{ + new_close_region_reply, new_downgrade_region_reply, send_mock_reply, + }; fn new_persistent_context() -> PersistentContext { PersistentContext { diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 679dfd1355..6cacf75063 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -126,7 +126,7 @@ impl OpenCandidateRegion { let msg = MailboxMessage::json_message( &format!("Open candidate region: {}", region_id), - &format!("Meta@{}", ctx.server_addr()), + &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", candidate.id, candidate.addr), common_time::util::current_time_millis(), &open_instruction, @@ -200,10 +200,11 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::{ - self, new_close_region_reply, new_open_region_reply, send_mock_reply, TestingEnv, - }; + use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + use crate::procedure::test_util::{ + new_close_region_reply, new_open_region_reply, send_mock_reply, + }; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 40d8325c89..9a32f9d301 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -18,12 +18,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::mailbox_message::Payload; -use api::v1::meta::{HeartbeatResponse, MailboxMessage}; +use api::v1::meta::MailboxMessage; use common_meta::ddl::NoopRegionFailureDetectorControl; -use common_meta::instruction::{ - DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, -}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -31,22 +27,19 @@ use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::rpc::router::RegionRoute; -use common_meta::sequence::{Sequence, SequenceBuilder}; +use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::DatanodeId; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status}; use common_procedure_test::MockContextProvider; use common_telemetry::debug; -use common_time::util::current_time_millis; use futures::future::BoxFuture; use store_api::storage::RegionId; use table::metadata::RawTableInfo; -use tokio::sync::mpsc::{Receiver, Sender}; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; -use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::metasrv::MetasrvInfo; use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; @@ -59,40 +52,8 @@ use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandida use crate::procedure::region_migration::{ Context, ContextFactory, DefaultContextFactory, PersistentContext, State, VolatileContext, }; -use crate::service::mailbox::{Channel, MailboxRef}; - -pub type MockHeartbeatReceiver = Receiver>; - -/// The context of mailbox. -pub struct MailboxContext { - mailbox: MailboxRef, - // The pusher is used in the mailbox. - pushers: Pushers, -} - -impl MailboxContext { - pub fn new(sequence: Sequence) -> Self { - let pushers = Pushers::default(); - let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence); - - Self { mailbox, pushers } - } - - /// Inserts a pusher for `datanode_id` - pub async fn insert_heartbeat_response_receiver( - &mut self, - channel: Channel, - tx: Sender>, - ) { - let pusher_id = channel.pusher_id(); - let pusher = Pusher::new(tx); - let _ = self.pushers.insert(pusher_id.string_key(), pusher).await; - } - - pub fn mailbox(&self) -> &MailboxRef { - &self.mailbox - } -} +use crate::procedure::test_util::{send_mock_reply, MailboxContext}; +use crate::service::mailbox::Channel; /// `TestingEnv` provides components during the tests. pub struct TestingEnv { @@ -157,7 +118,7 @@ impl TestingEnv { server_addr: self.server_addr.to_string(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( - self.mailbox_ctx.mailbox.clone(), + self.mailbox_ctx.mailbox().clone(), MetasrvInfo { server_addr: self.server_addr.to_string(), }, @@ -210,105 +171,6 @@ impl TestingEnv { } } -/// Generates a [InstructionReply::OpenRegion] reply. -pub(crate) fn new_open_region_reply( - id: u64, - result: bool, - error: Option, -) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) - .unwrap(), - )), - } -} - -/// Generates a [InstructionReply::CloseRegion] reply. -pub fn new_close_region_reply(id: u64) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { - result: false, - error: None, - })) - .unwrap(), - )), - } -} - -/// Generates a [InstructionReply::DowngradeRegion] reply. -pub fn new_downgrade_region_reply( - id: u64, - last_entry_id: Option, - exist: bool, - error: Option, -) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, - exists: exist, - error, - })) - .unwrap(), - )), - } -} - -/// Generates a [InstructionReply::UpgradeRegion] reply. -pub fn new_upgrade_region_reply( - id: u64, - ready: bool, - exists: bool, - error: Option, -) -> MailboxMessage { - MailboxMessage { - id, - subject: "mock".to_string(), - from: "datanode".to_string(), - to: "meta".to_string(), - timestamp_millis: current_time_millis(), - payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready, - exists, - error, - })) - .unwrap(), - )), - } -} - -/// Sends a mock reply. -pub fn send_mock_reply( - mailbox: MailboxRef, - mut rx: MockHeartbeatReceiver, - msg: impl Fn(u64) -> Result + Send + 'static, -) { - common_runtime::spawn_global(async move { - while let Some(Ok(resp)) = rx.recv().await { - let reply_id = resp.mailbox_message.unwrap().id; - mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); - } - }); -} - /// Generates a [PersistentContext]. pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext { PersistentContext { diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 6ed8e4905b..6f42c670dd 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -112,7 +112,7 @@ impl UpgradeCandidateRegion { let msg = MailboxMessage::json_message( &format!("Upgrade candidate region: {}", region_id), - &format!("Meta@{}", ctx.server_addr()), + &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", candidate.id, candidate.addr), common_time::util::current_time_millis(), &upgrade_instruction, @@ -226,10 +226,11 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::{ - new_close_region_reply, new_upgrade_region_reply, send_mock_reply, TestingEnv, - }; + use crate::procedure::region_migration::test_util::TestingEnv; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; + use crate::procedure::test_util::{ + new_close_region_reply, new_upgrade_region_reply, send_mock_reply, + }; fn new_persistent_context() -> PersistentContext { PersistentContext { diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs new file mode 100644 index 0000000000..62f60447c3 --- /dev/null +++ b/src/meta-srv/src/procedure/test_util.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::mailbox_message::Payload; +use api::v1::meta::{HeartbeatResponse, MailboxMessage}; +use common_meta::instruction::{ + DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, +}; +use common_meta::sequence::Sequence; +use common_time::util::current_time_millis; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::error::Result; +use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::service::mailbox::{Channel, MailboxRef}; + +pub type MockHeartbeatReceiver = Receiver>; + +/// The context of mailbox. +pub struct MailboxContext { + mailbox: MailboxRef, + // The pusher is used in the mailbox. + pushers: Pushers, +} + +impl MailboxContext { + pub fn new(sequence: Sequence) -> Self { + let pushers = Pushers::default(); + let mailbox = HeartbeatMailbox::create(pushers.clone(), sequence); + + Self { mailbox, pushers } + } + + /// Inserts a pusher for `datanode_id` + pub async fn insert_heartbeat_response_receiver( + &mut self, + channel: Channel, + tx: Sender>, + ) { + let pusher_id = channel.pusher_id(); + let pusher = Pusher::new(tx); + let _ = self.pushers.insert(pusher_id.string_key(), pusher).await; + } + + pub fn mailbox(&self) -> &MailboxRef { + &self.mailbox + } +} + +/// Sends a mock reply. +pub fn send_mock_reply( + mailbox: MailboxRef, + mut rx: MockHeartbeatReceiver, + msg: impl Fn(u64) -> Result + Send + 'static, +) { + common_runtime::spawn_global(async move { + while let Some(Ok(resp)) = rx.recv().await { + let reply_id = resp.mailbox_message.unwrap().id; + mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap(); + } + }); +} + +/// Generates a [InstructionReply::OpenRegion] reply. +pub(crate) fn new_open_region_reply( + id: u64, + result: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) + .unwrap(), + )), + } +} + +/// Generates a [InstructionReply::CloseRegion] reply. +pub fn new_close_region_reply(id: u64) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { + result: false, + error: None, + })) + .unwrap(), + )), + } +} + +/// Generates a [InstructionReply::DowngradeRegion] reply. +pub fn new_downgrade_region_reply( + id: u64, + last_entry_id: Option, + exist: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { + last_entry_id, + exists: exist, + error, + })) + .unwrap(), + )), + } +} + +/// Generates a [InstructionReply::UpgradeRegion] reply. +pub fn new_upgrade_region_reply( + id: u64, + ready: bool, + exists: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready, + exists, + error, + })) + .unwrap(), + )), + } +}