diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 1c67830736..6038bc6901 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -787,41 +787,6 @@ pub enum Error { location: Location, source: common_meta::error::Error, }, - - #[snafu(display("Logical table cannot add follower: {table_id}"))] - LogicalTableCannotAddFollower { - table_id: TableId, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display( - "A region follower cannot be placed on the same node as the leader: {region_id}, {peer_id}" - ))] - RegionFollowerLeaderConflict { - region_id: RegionId, - peer_id: u64, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display( - "Multiple region followers cannot be placed on the same node: {region_id}, {peer_id}" - ))] - MultipleRegionFollowersOnSameNode { - region_id: RegionId, - peer_id: u64, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Region follower not exists: {region_id}, {peer_id}"))] - RegionFollowerNotExists { - region_id: RegionId, - peer_id: u64, - #[snafu(implicit)] - location: Location, - }, } impl Error { @@ -891,10 +856,6 @@ impl ErrorExt for Error { | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } - | Error::LogicalTableCannotAddFollower { .. } - | Error::RegionFollowerLeaderConflict { .. } - | Error::MultipleRegionFollowersOnSameNode { .. } - | Error::RegionFollowerNotExists { .. } | Error::HandlerNotFound { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index dc275e32ae..e7a9edb563 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -55,8 +55,6 @@ use crate::lease::MetaPeerLookupService; use crate::metasrv::{ ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; -use crate::procedure::region_follower::manager::RegionFollowerManager; -use crate::procedure::region_follower::Context as ArfContext; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::region::supervisor::{ @@ -345,19 +343,6 @@ impl MetasrvBuilder { .context(error::InitDdlManagerSnafu)?, ); - // alter region follower manager - let region_follower_manager = Arc::new(RegionFollowerManager::new( - procedure_manager.clone(), - ArfContext { - table_metadata_manager: table_metadata_manager.clone(), - mailbox: mailbox.clone(), - server_addr: options.server_addr.clone(), - cache_invalidator: cache_invalidator.clone(), - meta_peer_client: meta_peer_client.clone(), - }, - )); - region_follower_manager.try_start()?; - let handler_group_builder = match handler_group_builder { Some(handler_group_builder) => handler_group_builder, None => { diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 9185fd05ab..5b4d247536 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -18,7 +18,6 @@ use common_meta::leadership_notifier::LeadershipChangeListener; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; -pub mod region_follower; pub mod region_migration; #[cfg(test)] mod test_util; diff --git a/src/meta-srv/src/procedure/region_follower/create.rs b/src/meta-srv/src/procedure/region_follower/create.rs deleted file mode 100644 index ee281614a8..0000000000 --- a/src/meta-srv/src/procedure/region_follower/create.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::{Duration, Instant}; - -use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; -use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; -use common_meta::key::datanode_table::RegionInfo; -use common_meta::peer::Peer; -use common_meta::RegionIdent; -use common_telemetry::info; -use snafu::ResultExt; -use store_api::storage::RegionId; - -use super::Context; -use crate::error::{self, Result}; -use crate::handler::HeartbeatMailbox; -use crate::service::mailbox::Channel; - -/// Uses lease time of a region as the timeout of opening a follower region. -const OPEN_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - -pub(crate) struct CreateFollower { - region_id: RegionId, - // The peer of the datanode to add region follower. - peer: Peer, -} - -impl CreateFollower { - pub fn new(region_id: RegionId, peer: Peer) -> Self { - Self { region_id, peer } - } - - /// Builds the open region instruction for the region follower. - pub(crate) async fn build_open_region_instruction( - &self, - region_info: RegionInfo, - ) -> Result { - let datanode_id = self.peer.id; - let table_id = self.region_id.table_id(); - let region_number = self.region_id.region_number(); - - let RegionInfo { - region_storage_path, - region_options, - region_wal_options, - engine, - } = region_info; - - let region_ident = RegionIdent { - datanode_id, - table_id, - region_number, - engine, - }; - - let open_instruction = Instruction::OpenRegion(OpenRegion::new( - region_ident, - ®ion_storage_path, - region_options, - region_wal_options, - true, - )); - - Ok(open_instruction) - } - - /// Sends the open region instruction to the datanode. - pub(crate) async fn send_open_region_instruction( - &self, - ctx: &Context, - instruction: Instruction, - ) -> Result<()> { - // TODO(jeremy): register the opening_region_keeper - let msg = MailboxMessage::json_message( - &format!("Open a follower region: {}", self.region_id), - &format!("Metasrv@{}", ctx.server_addr), - &format!("Datanode-{}@{}", self.peer.id, self.peer.addr), - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let ch = Channel::Datanode(self.peer.id); - let now = Instant::now(); - let receiver = ctx - .mailbox - .send(&ch, msg, OPEN_REGION_FOLLOWER_TIMEOUT) - .await?; - - match receiver.await? { - Ok(msg) => { - let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received open region follower reply: {:?}, region: {}, elapsed: {:?}", - reply, - self.region_id, - now.elapsed() - ); - let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect open region follower reply", - } - .fail(); - }; - - if result { - Ok(()) - } else { - error::RetryLaterSnafu { - reason: format!( - "Region {} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}", - self.region_id, - &self.peer, - now.elapsed() - ), - } - .fail() - } - } - Err(error::Error::MailboxTimeout { .. }) => { - let reason = format!( - "Mailbox received timeout for open region follower {} on datanode {:?}, elapsed: {:?}", - self.region_id, - &self.peer, - now.elapsed() - ); - error::RetryLaterSnafu { reason }.fail() - } - Err(e) => Err(e), - } - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use common_meta::DatanodeId; - - use super::*; - use crate::error::Error; - use crate::procedure::region_follower::test_util::TestingEnv; - use crate::procedure::test_util::{new_close_region_reply, send_mock_reply}; - - #[tokio::test] - async fn test_datanode_is_unreachable() { - let env = TestingEnv::new(); - let ctx = env.new_context(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - let create_follower = CreateFollower::new(region_id, peer.clone()); - let instruction = mock_open_region_instruction(peer.id, region_id); - let err = create_follower - .send_open_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::PusherNotFound { .. }); - assert!(!err.is_retryable()); - } - - #[tokio::test] - async fn test_unexpected_instruction_reply() { - let mut env = TestingEnv::new(); - let ctx = env.new_context(); - let mailbox_ctx = env.mailbox_context_mut(); - let mailbox = mailbox_ctx.mailbox().clone(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - - let (tx, rx) = tokio::sync::mpsc::channel(1); - - mailbox_ctx - .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) - .await; - - // Sends an timeout error. - send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id))); - - let create_follower = CreateFollower::new(region_id, peer.clone()); - let instruction = mock_open_region_instruction(peer.id, region_id); - let err = create_follower - .send_open_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::UnexpectedInstructionReply { .. }); - assert!(!err.is_retryable()); - } - - #[tokio::test] - async fn test_instruction_exceeded_deadline() { - let mut env = TestingEnv::new(); - let ctx = env.new_context(); - let mailbox_ctx = env.mailbox_context_mut(); - let mailbox = mailbox_ctx.mailbox().clone(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - - let (tx, rx) = tokio::sync::mpsc::channel(1); - - mailbox_ctx - .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) - .await; - - // Sends an timeout error. - send_mock_reply(mailbox, rx, |id| { - Err(error::MailboxTimeoutSnafu { id }.build()) - }); - - let create_follower = CreateFollower::new(region_id, peer.clone()); - let instruction = mock_open_region_instruction(peer.id, region_id); - let err = create_follower - .send_open_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::RetryLater { .. }); - assert!(err.is_retryable()); - } - - fn mock_open_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { - Instruction::OpenRegion(OpenRegion { - region_ident: RegionIdent { - datanode_id, - table_id: region_id.table_id(), - region_number: region_id.region_number(), - engine: "mito2".to_string(), - }, - region_storage_path: "/tmp".to_string(), - region_options: Default::default(), - region_wal_options: Default::default(), - skip_wal_replay: true, - }) - } -} diff --git a/src/meta-srv/src/procedure/region_follower/manager.rs b/src/meta-srv/src/procedure/region_follower/manager.rs deleted file mode 100644 index 40f7da8efd..0000000000 --- a/src/meta-srv/src/procedure/region_follower/manager.rs +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_meta::rpc::procedure::{AddRegionFollowerRequest, RemoveRegionFollowerRequest}; -use common_procedure::{watcher, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId}; -use common_telemetry::info; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; -use table::table_name::TableName; - -use super::remove_region_follower::RemoveRegionFollowerProcedure; -use crate::error::{self, Result}; -use crate::procedure::region_follower::add_region_follower::AddRegionFollowerProcedure; -use crate::procedure::region_follower::Context; - -pub struct RegionFollowerManager { - procedure_manager: ProcedureManagerRef, - default_context: Context, -} - -impl RegionFollowerManager { - pub fn new(procedure_manager: ProcedureManagerRef, default_context: Context) -> Self { - Self { - procedure_manager, - default_context, - } - } - - pub fn new_context(&self) -> Context { - self.default_context.clone() - } - - pub(crate) fn try_start(&self) -> Result<()> { - // register add region follower procedure - let context = self.new_context(); - let type_name = AddRegionFollowerProcedure::TYPE_NAME; - self.procedure_manager - .register_loader( - type_name, - Box::new(move |json| { - let context = context.clone(); - AddRegionFollowerProcedure::from_json(json, context).map(|p| Box::new(p) as _) - }), - ) - .context(error::RegisterProcedureLoaderSnafu { type_name })?; - - // register remove region follower procedure - let context = self.new_context(); - let type_name = RemoveRegionFollowerProcedure::TYPE_NAME; - self.procedure_manager - .register_loader( - type_name, - Box::new(move |json| { - let context = context.clone(); - RemoveRegionFollowerProcedure::from_json(json, context) - .map(|p| Box::new(p) as _) - }), - ) - .context(error::RegisterProcedureLoaderSnafu { type_name })?; - Ok(()) - } - - pub async fn submit_add_follower_procedure( - &self, - req: AddRegionFollowerRequest, - ) -> Result<(ProcedureId, Option)> { - let AddRegionFollowerRequest { region_id, peer_id } = req; - let region_id = RegionId::from_u64(region_id); - let table_id = region_id.table_id(); - let ctx = self.new_context(); - - // get the table info - let table_info = ctx - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .context(error::TableInfoNotFoundSnafu { table_id })? - .into_inner(); - - let TableName { - catalog_name, - schema_name, - .. - } = table_info.table_name(); - - let procedure = - AddRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let procedure_id = procedure_with_id.id; - info!("Starting add region follower procedure {procedure_id} for {req:?}"); - let mut watcher = self - .procedure_manager - .submit(procedure_with_id) - .await - .context(error::SubmitProcedureSnafu)?; - let output = watcher::wait(&mut watcher) - .await - .context(error::WaitProcedureSnafu)?; - - Ok((procedure_id, output)) - } - - pub async fn submit_remove_follower_procedure( - &self, - req: RemoveRegionFollowerRequest, - ) -> Result<(ProcedureId, Option)> { - let RemoveRegionFollowerRequest { region_id, peer_id } = req; - let region_id = RegionId::from_u64(region_id); - let table_id = region_id.table_id(); - let ctx = self.new_context(); - - // get the table info - let table_info = ctx - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu)? - .context(error::TableInfoNotFoundSnafu { table_id })? - .into_inner(); - - let TableName { - catalog_name, - schema_name, - .. - } = table_info.table_name(); - - let procedure = - RemoveRegionFollowerProcedure::new(catalog_name, schema_name, region_id, peer_id, ctx); - - let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); - let procedure_id = procedure_with_id.id; - info!("Starting remove region follower procedure {procedure_id} for {req:?}"); - let mut watcher = self - .procedure_manager - .submit(procedure_with_id) - .await - .context(error::SubmitProcedureSnafu)?; - let output = watcher::wait(&mut watcher) - .await - .context(error::WaitProcedureSnafu)?; - - Ok((procedure_id, output)) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use super::*; - use crate::procedure::region_follower::test_util::TestingEnv; - - #[tokio::test] - async fn test_submit_procedure_table_not_found() { - let env = TestingEnv::new(); - let ctx = env.new_context(); - let region_follower_manager = RegionFollowerManager::new(env.procedure_manager(), ctx); - let req = AddRegionFollowerRequest { - region_id: 1, - peer_id: 2, - }; - let err = region_follower_manager - .submit_add_follower_procedure(req) - .await - .unwrap_err(); - assert_matches!(err, error::Error::TableInfoNotFound { .. }); - - let req = RemoveRegionFollowerRequest { - region_id: 1, - peer_id: 2, - }; - let err = region_follower_manager - .submit_remove_follower_procedure(req) - .await - .unwrap_err(); - assert_matches!(err, error::Error::TableInfoNotFound { .. }); - } -} diff --git a/src/meta-srv/src/procedure/region_follower/remove.rs b/src/meta-srv/src/procedure/region_follower/remove.rs deleted file mode 100644 index c066812931..0000000000 --- a/src/meta-srv/src/procedure/region_follower/remove.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::{Duration, Instant}; - -use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::key::datanode_table::RegionInfo; -use common_meta::peer::Peer; -use common_meta::RegionIdent; -use common_telemetry::info; -use snafu::ResultExt; -use store_api::storage::RegionId; - -use super::Context; -use crate::error::{self, Result}; -use crate::handler::HeartbeatMailbox; -use crate::service::mailbox::Channel; - -/// Uses lease time of a region as the timeout of closing a follower region. -const CLOSE_REGION_FOLLOWER_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - -pub(crate) struct RemoveFollower { - region_id: RegionId, - // The peer of the datanode to add region follower. - peer: Peer, -} - -impl RemoveFollower { - pub fn new(region_id: RegionId, peer: Peer) -> Self { - Self { region_id, peer } - } - - /// Builds the close region instruction for the region follower. - pub(crate) async fn build_close_region_instruction( - &self, - region_info: RegionInfo, - ) -> Result { - let datanode_id = self.peer.id; - let table_id = self.region_id.table_id(); - let region_number = self.region_id.region_number(); - - let RegionInfo { engine, .. } = region_info; - - let region_ident = RegionIdent { - datanode_id, - table_id, - region_number, - engine, - }; - - let close_instruction = Instruction::CloseRegion(region_ident); - - Ok(close_instruction) - } - - /// Sends the close region instruction to the datanode. - pub(crate) async fn send_close_region_instruction( - &self, - ctx: &Context, - instruction: Instruction, - ) -> Result<()> { - let msg = MailboxMessage::json_message( - &format!("Close a follower region: {}", self.region_id), - &format!("Metasrv@{}", ctx.server_addr), - &format!("Datanode-{}@{}", self.peer.id, self.peer.addr), - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - let ch = Channel::Datanode(self.peer.id); - let now = Instant::now(); - let receiver = ctx - .mailbox - .send(&ch, msg, CLOSE_REGION_FOLLOWER_TIMEOUT) - .await?; - - match receiver.await? { - Ok(msg) => { - let reply = HeartbeatMailbox::json_reply(&msg)?; - info!( - "Received close region follower reply: {:?}, region: {}, elapsed: {:?}", - reply, - self.region_id, - now.elapsed() - ); - let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "expect close region follower reply", - } - .fail(); - }; - - if result { - Ok(()) - } else { - error::RetryLaterSnafu { - reason: format!( - "Region {} is not closed by datanode {:?}, error: {error:?}, elapsed: {:?}", - self.region_id, - &self.peer, - now.elapsed() - ), - } - .fail() - } - } - Err(error::Error::MailboxTimeout { .. }) => { - let reason = format!( - "Mailbox received timeout for close region follower {} on datanode {:?}, elapsed: {:?}", - self.region_id, - &self.peer, - now.elapsed() - ); - error::RetryLaterSnafu { reason }.fail() - } - Err(e) => Err(e), - } - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use common_meta::DatanodeId; - - use super::*; - use crate::error::Error; - use crate::procedure::region_follower::test_util::TestingEnv; - use crate::procedure::test_util::{new_open_region_reply, send_mock_reply}; - - #[tokio::test] - async fn test_datanode_is_unreachable() { - let env = TestingEnv::new(); - let ctx = env.new_context(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - let remove_follower = RemoveFollower::new(region_id, peer.clone()); - let instruction = mock_close_region_instruction(peer.id, region_id); - let err = remove_follower - .send_close_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::PusherNotFound { .. }); - assert!(!err.is_retryable()); - } - - #[tokio::test] - async fn test_unexpected_instruction_reply() { - let mut env = TestingEnv::new(); - let ctx = env.new_context(); - let mailbox_ctx = env.mailbox_context_mut(); - let mailbox = mailbox_ctx.mailbox().clone(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - - let (tx, rx) = tokio::sync::mpsc::channel(1); - - mailbox_ctx - .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) - .await; - - // Sends an timeout error. - send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, false, None))); - - let remove_follower = RemoveFollower::new(region_id, peer.clone()); - let instruction = mock_close_region_instruction(peer.id, region_id); - let err = remove_follower - .send_close_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::UnexpectedInstructionReply { .. }); - assert!(!err.is_retryable()); - } - - #[tokio::test] - async fn test_instruction_exceeded_deadline() { - let mut env = TestingEnv::new(); - let ctx = env.new_context(); - let mailbox_ctx = env.mailbox_context_mut(); - let mailbox = mailbox_ctx.mailbox().clone(); - - let region_id = RegionId::new(1, 1); - let peer = Peer::new(1, "127.0.0.1:8080"); - - let (tx, rx) = tokio::sync::mpsc::channel(1); - - mailbox_ctx - .insert_heartbeat_response_receiver(Channel::Datanode(peer.id), tx) - .await; - - // Sends an timeout error. - send_mock_reply(mailbox, rx, |id| { - Err(error::MailboxTimeoutSnafu { id }.build()) - }); - - let remove_follower = RemoveFollower::new(region_id, peer.clone()); - let instruction = mock_close_region_instruction(peer.id, region_id); - let err = remove_follower - .send_close_region_instruction(&ctx, instruction) - .await - .unwrap_err(); - assert_matches!(err, Error::RetryLater { .. }); - assert!(err.is_retryable()); - } - - fn mock_close_region_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { - Instruction::CloseRegion(RegionIdent { - datanode_id, - table_id: region_id.table_id(), - region_number: region_id.region_number(), - engine: "mito2".to_string(), - }) - } -} diff --git a/src/meta-srv/src/procedure/region_follower/test_util.rs b/src/meta-srv/src/procedure/region_follower/test_util.rs deleted file mode 100644 index 69e7b42583..0000000000 --- a/src/meta-srv/src/procedure/region_follower/test_util.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; -use common_meta::kv_backend::memory::MemoryKvBackend; -use common_meta::kv_backend::ResettableKvBackendRef; -use common_meta::sequence::SequenceBuilder; -use common_meta::state_store::KvStateStore; -use common_procedure::local::{LocalManager, ManagerConfig}; -use common_procedure::ProcedureManagerRef; - -use super::Context; -use crate::cache_invalidator::MetasrvCacheInvalidator; -use crate::cluster::MetaPeerClientBuilder; -use crate::metasrv::MetasrvInfo; -use crate::procedure::test_util::MailboxContext; - -/// `TestingEnv` provides components during the tests. -pub struct TestingEnv { - table_metadata_manager: TableMetadataManagerRef, - mailbox_ctx: MailboxContext, - server_addr: String, - procedure_manager: ProcedureManagerRef, - in_memory: ResettableKvBackendRef, -} - -impl Default for TestingEnv { - fn default() -> Self { - Self::new() - } -} - -impl TestingEnv { - pub fn new() -> Self { - let kv_backend = Arc::new(MemoryKvBackend::new()); - let in_memory = Arc::new(MemoryKvBackend::new()); - let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); - - let mailbox_sequence = - SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build(); - - let mailbox_ctx = MailboxContext::new(mailbox_sequence); - - let state_store = Arc::new(KvStateStore::new(kv_backend)); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); - - Self { - table_metadata_manager, - mailbox_ctx, - server_addr: "localhost".to_string(), - procedure_manager, - in_memory, - } - } - - pub fn new_context(&self) -> Context { - Context { - table_metadata_manager: self.table_metadata_manager.clone(), - mailbox: self.mailbox_ctx.mailbox().clone(), - server_addr: self.server_addr.clone(), - cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( - self.mailbox_ctx.mailbox().clone(), - MetasrvInfo { - server_addr: self.server_addr.to_string(), - }, - )), - meta_peer_client: MetaPeerClientBuilder::default() - .election(None) - .in_memory(self.in_memory.clone()) - .build() - .map(Arc::new) - // Safety: all required fields set at initialization - .unwrap(), - } - } - - pub fn mailbox_context_mut(&mut self) -> &mut MailboxContext { - &mut self.mailbox_ctx - } - - pub fn procedure_manager(&self) -> ProcedureManagerRef { - self.procedure_manager.clone() - } -}