From 328ec56b6331455b7933f70da548bcb130af9c9a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 22 Oct 2025 11:43:47 +0800 Subject: [PATCH] feat: introduce `OpenRegions` and `CloseRegions` instructions to support batch region operations (#7122) * feat: introduce `OpenRegions` and `CloseRegions` instructions to support batch region operations Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * feat: merge instructions Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- .../meta/src/heartbeat/handler/tests.rs | 2 +- src/common/meta/src/instruction.rs | 126 ++++++++++++--- src/datanode/src/heartbeat.rs | 5 +- src/datanode/src/heartbeat/handler.rs | 69 +++++--- .../src/heartbeat/handler/close_region.rs | 136 +++++++++++++--- .../src/heartbeat/handler/open_region.rs | 149 +++++++++++++++--- .../close_downgraded_region.rs | 6 +- .../region_migration/open_candidate_region.rs | 10 +- src/meta-srv/src/procedure/test_util.rs | 9 +- 9 files changed, 410 insertions(+), 102 deletions(-) diff --git a/src/common/meta/src/heartbeat/handler/tests.rs b/src/common/meta/src/heartbeat/handler/tests.rs index 3313efe6f1..5a8cd6971e 100644 --- a/src/common/meta/src/heartbeat/handler/tests.rs +++ b/src/common/meta/src/heartbeat/handler/tests.rs @@ -24,7 +24,7 @@ async fn test_heartbeat_mailbox() { let mailbox = HeartbeatMailbox::new(tx); let meta = MessageMeta::new_test(1, "test", "foo", "bar"); - let reply = InstructionReply::OpenRegion(SimpleReply { + let reply = InstructionReply::OpenRegions(SimpleReply { result: true, error: None, }); diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 4781911322..9a9d955f58 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::time::Duration; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use store_api::storage::{RegionId, RegionNumber}; use strum::Display; use table::metadata::TableId; @@ -394,16 +394,33 @@ impl From for FlushRegions { } } +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum SingleOrMultiple { + Single(T), + Multiple(Vec), +} + +fn single_or_multiple_from<'de, D, T>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, +{ + let helper = SingleOrMultiple::::deserialize(deserializer)?; + Ok(match helper { + SingleOrMultiple::Single(x) => vec![x], + SingleOrMultiple::Multiple(xs) => xs, + }) +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { - /// Opens a region. - /// - /// - Returns true if a specified region exists. - OpenRegion(OpenRegion), - /// Closes a region. - /// - /// - Returns true if a specified region does not exist. - CloseRegion(RegionIdent), + /// Opens regions. + #[serde(deserialize_with = "single_or_multiple_from", alias = "OpenRegion")] + OpenRegions(Vec), + /// Closes regions. + #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")] + CloseRegions(Vec), /// Upgrades a region. UpgradeRegion(UpgradeRegion), /// Downgrades a region. @@ -438,8 +455,10 @@ impl Display for UpgradeRegionReply { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { - OpenRegion(SimpleReply), - CloseRegion(SimpleReply), + #[serde(alias = "open_region")] + OpenRegions(SimpleReply), + #[serde(alias = "close_region")] + CloseRegions(SimpleReply), UpgradeRegion(UpgradeRegionReply), DowngradeRegion(DowngradeRegionReply), FlushRegions(FlushRegionReply), @@ -448,8 +467,8 @@ pub enum InstructionReply { impl Display for InstructionReply { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply), - Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply), + Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply), + Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply), Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply), Self::DowngradeRegion(reply) => { write!(f, "InstructionReply::DowngradeRegion({})", reply) @@ -459,13 +478,30 @@ impl Display for InstructionReply { } } +#[cfg(any(test, feature = "testing"))] +impl InstructionReply { + pub fn expect_close_regions_reply(self) -> SimpleReply { + match self { + Self::CloseRegions(reply) => reply, + _ => panic!("Expected CloseRegions reply"), + } + } + + pub fn expect_open_regions_reply(self) -> SimpleReply { + match self { + Self::OpenRegions(reply) => reply, + _ => panic!("Expected OpenRegions reply"), + } + } +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_serialize_instruction() { - let open_region = Instruction::OpenRegion(OpenRegion::new( + let open_region = Instruction::OpenRegions(vec![OpenRegion::new( RegionIdent { datanode_id: 2, table_id: 1024, @@ -476,30 +512,78 @@ mod tests { HashMap::new(), HashMap::new(), false, - )); + )]); let serialized = serde_json::to_string(&open_region).unwrap(); - assert_eq!( - r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#, + r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#, serialized ); - let close_region = Instruction::CloseRegion(RegionIdent { + let close_region = Instruction::CloseRegions(vec![RegionIdent { datanode_id: 2, table_id: 1024, region_number: 1, engine: "mito2".to_string(), - }); + }]); let serialized = serde_json::to_string(&close_region).unwrap(); - assert_eq!( - r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#, + r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#, serialized ); } + #[test] + fn test_deserialize_instruction() { + let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#; + let open_region_instruction: Instruction = + serde_json::from_str(open_region_instruction).unwrap(); + let open_region = Instruction::OpenRegions(vec![OpenRegion::new( + RegionIdent { + datanode_id: 2, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }, + "test/foo", + HashMap::new(), + HashMap::new(), + false, + )]); + assert_eq!(open_region_instruction, open_region); + + let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#; + let close_region_instruction: Instruction = + serde_json::from_str(close_region_instruction).unwrap(); + let close_region = Instruction::CloseRegions(vec![RegionIdent { + datanode_id: 2, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }]); + assert_eq!(close_region_instruction, close_region); + + let close_region_instruction_reply = + r#"{"result":true,"error":null,"type":"close_region"}"#; + let close_region_instruction_reply: InstructionReply = + serde_json::from_str(close_region_instruction_reply).unwrap(); + let close_region_reply = InstructionReply::CloseRegions(SimpleReply { + result: true, + error: None, + }); + assert_eq!(close_region_instruction_reply, close_region_reply); + + let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#; + let open_region_instruction_reply: InstructionReply = + serde_json::from_str(open_region_instruction_reply).unwrap(); + let open_region_reply = InstructionReply::OpenRegions(SimpleReply { + result: true, + error: None, + }); + assert_eq!(open_region_instruction_reply, open_region_reply); + } + #[derive(Debug, Clone, Serialize, Deserialize)] struct LegacyOpenRegion { region_ident: RegionIdent, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index bb3f25957c..9c059e5698 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -90,7 +90,10 @@ impl HeartbeatTask { let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ region_alive_keeper.clone(), Arc::new(ParseMailboxMessageHandler), - Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())), + Arc::new( + RegionHeartbeatResponseHandler::new(region_server.clone()) + .with_open_region_parallelism(opts.init_regions_parallelism), + ), Arc::new(InvalidateCacheHandler::new(cache_invalidator)), ])); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 165b8e057b..14a671a14b 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -40,6 +40,7 @@ pub struct RegionHeartbeatResponseHandler { catchup_tasks: TaskTracker<()>, downgrade_tasks: TaskTracker<()>, flush_tasks: TaskTracker<()>, + open_region_parallelism: usize, } /// Handler of the instruction. @@ -78,17 +79,29 @@ impl RegionHeartbeatResponseHandler { catchup_tasks: TaskTracker::new(), downgrade_tasks: TaskTracker::new(), flush_tasks: TaskTracker::new(), + // Default to half of the number of CPUs. + open_region_parallelism: (num_cpus::get() / 2).max(1), } } + /// Sets the parallelism for opening regions. + pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self { + self.open_region_parallelism = parallelism; + self + } + /// Builds the [InstructionHandler]. - fn build_handler(instruction: Instruction) -> MetaResult { + fn build_handler(&self, instruction: Instruction) -> MetaResult { match instruction { - Instruction::OpenRegion(open_region) => Ok(Box::new(move |handler_context| { - handler_context.handle_open_region_instruction(open_region) - })), - Instruction::CloseRegion(close_region) => Ok(Box::new(|handler_context| { - handler_context.handle_close_region_instruction(close_region) + Instruction::OpenRegions(open_regions) => { + let open_region_parallelism = self.open_region_parallelism; + Ok(Box::new(move |handler_context| { + handler_context + .handle_open_regions_instruction(open_regions, open_region_parallelism) + })) + } + Instruction::CloseRegions(close_regions) => Ok(Box::new(move |handler_context| { + handler_context.handle_close_regions_instruction(close_regions) })), Instruction::DowngradeRegion(downgrade_region) => { Ok(Box::new(move |handler_context| { @@ -109,14 +122,22 @@ impl RegionHeartbeatResponseHandler { #[async_trait] impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - matches!( - ctx.incoming_message.as_ref(), - Some((_, Instruction::OpenRegion { .. })) - | Some((_, Instruction::CloseRegion { .. })) - | Some((_, Instruction::DowngradeRegion { .. })) - | Some((_, Instruction::UpgradeRegion { .. })) - | Some((_, Instruction::FlushRegions { .. })) - ) + matches!(ctx.incoming_message.as_ref(), |Some(( + _, + Instruction::DowngradeRegion { .. }, + ))| Some(( + _, + Instruction::UpgradeRegion { .. } + )) | Some(( + _, + Instruction::FlushRegions { .. } + )) | Some(( + _, + Instruction::OpenRegions { .. } + )) | Some(( + _, + Instruction::CloseRegions { .. } + ))) } async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { @@ -130,7 +151,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); let flush_tasks = self.flush_tasks.clone(); - let handler = Self::build_handler(instruction)?; + let handler = self.build_handler(instruction)?; let _handle = common_runtime::spawn_global(async move { let reply = handler(HandlerContext { region_server, @@ -176,8 +197,8 @@ mod tests { use crate::tests::mock_region_server; pub struct HeartbeatResponseTestEnv { - mailbox: MailboxRef, - receiver: Receiver<(MessageMeta, InstructionReply)>, + pub(crate) mailbox: MailboxRef, + pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>, } impl HeartbeatResponseTestEnv { @@ -248,16 +269,16 @@ mod tests { } fn close_region_instruction(region_id: RegionId) -> Instruction { - Instruction::CloseRegion(RegionIdent { + Instruction::CloseRegions(vec![RegionIdent { table_id: region_id.table_id(), region_number: region_id.region_number(), datanode_id: 2, engine: MITO_ENGINE_NAME.to_string(), - }) + }]) } fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction { - Instruction::OpenRegion(OpenRegion::new( + Instruction::OpenRegions(vec![OpenRegion::new( RegionIdent { table_id: region_id.table_id(), region_number: region_id.region_number(), @@ -268,7 +289,7 @@ mod tests { HashMap::new(), HashMap::new(), false, - )) + )]) } #[tokio::test] @@ -303,7 +324,7 @@ mod tests { let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); - if let InstructionReply::CloseRegion(reply) = reply { + if let InstructionReply::CloseRegions(reply) = reply { assert!(reply.result); assert!(reply.error.is_none()); } else { @@ -358,7 +379,7 @@ mod tests { let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); - if let InstructionReply::OpenRegion(reply) = reply { + if let InstructionReply::OpenRegions(reply) = reply { assert!(reply.result); assert!(reply.error.is_none()); } else { @@ -391,7 +412,7 @@ mod tests { let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); - if let InstructionReply::OpenRegion(reply) = reply { + if let InstructionReply::OpenRegions(reply) = reply { assert!(!reply.result); assert!(reply.error.is_some()); } else { diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index da83786a7b..c942642731 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -14,7 +14,8 @@ use common_meta::RegionIdent; use common_meta::instruction::{InstructionReply, SimpleReply}; -use common_telemetry::{tracing, warn}; +use common_telemetry::warn; +use futures::future::join_all; use futures_util::future::BoxFuture; use store_api::region_request::{RegionCloseRequest, RegionRequest}; @@ -22,35 +23,124 @@ use crate::error; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { - #[tracing::instrument(skip_all)] - pub(crate) fn handle_close_region_instruction( + pub(crate) fn handle_close_regions_instruction( self, - region_ident: RegionIdent, + region_idents: Vec, ) -> BoxFuture<'static, Option> { Box::pin(async move { - let region_id = Self::region_ident_to_region_id(®ion_ident); - let request = RegionRequest::Close(RegionCloseRequest {}); - let result = self.region_server.handle_request(region_id, request).await; + let region_ids = region_idents + .into_iter() + .map(|region_ident| Self::region_ident_to_region_id(®ion_ident)) + .collect::>(); - match result { - Ok(_) => Some(InstructionReply::CloseRegion(SimpleReply { + let futs = region_ids.iter().map(|region_id| { + self.region_server + .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {})) + }); + + let results = join_all(futs).await; + + let mut errors = vec![]; + for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) { + match result { + Ok(_) => (), + Err(error::Error::RegionNotFound { .. }) => { + warn!( + "Received a close regions instruction from meta, but target region:{} is not found.", + region_id + ); + } + Err(err) => errors.push(format!("region:{region_id}: {err:?}")), + } + } + + if errors.is_empty() { + return Some(InstructionReply::CloseRegions(SimpleReply { result: true, error: None, - })), - Err(error::Error::RegionNotFound { .. }) => { - warn!( - "Received a close region instruction from meta, but target region:{region_id} is not found." - ); - Some(InstructionReply::CloseRegion(SimpleReply { - result: true, - error: None, - })) - } - Err(err) => Some(InstructionReply::CloseRegion(SimpleReply { - result: false, - error: Some(format!("{err:?}")), - })), + })); } + + Some(InstructionReply::CloseRegions(SimpleReply { + result: false, + error: Some(errors.join("; ")), + })) }) } } + +#[cfg(test)] +mod tests { + use std::assert_matches; + use std::sync::Arc; + + use assert_matches::assert_matches; + use common_meta::RegionIdent; + use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; + use common_meta::heartbeat::mailbox::MessageMeta; + use common_meta::instruction::Instruction; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::region_request::RegionRequest; + use store_api::storage::RegionId; + + use crate::heartbeat::handler::RegionHeartbeatResponseHandler; + use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv; + use crate::tests::mock_region_server; + + fn close_regions_instruction(region_ids: impl IntoIterator) -> Instruction { + let region_idents = region_ids + .into_iter() + .map(|region_id| RegionIdent { + table_id: region_id.table_id(), + region_number: region_id.region_number(), + datanode_id: 2, + engine: MITO_ENGINE_NAME.to_string(), + }) + .collect(); + + Instruction::CloseRegions(region_idents) + } + + #[tokio::test] + async fn test_close_regions() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let mut engine_env = TestEnv::with_prefix("close-regions").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + let region_id = RegionId::new(1024, 1); + let region_id1 = RegionId::new(1024, 2); + + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + + let create_req1 = builder.build(); + region_server + .handle_request(region_id1, RegionRequest::Create(create_req1)) + .await + .unwrap(); + let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0"); + let instruction = + close_regions_instruction([region_id, region_id1, RegionId::new(1024, 3)]); + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Continue); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + let reply = reply.expect_close_regions_reply(); + assert!(reply.result); + assert!(reply.error.is_none()); + assert!(!engine.is_region_exists(region_id)); + assert!(!engine.is_region_exists(region_id1)); + assert!(!engine.is_region_exists(RegionId::new(1024, 3))); + } +} diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index ed5ce72a4d..e6ea973eec 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -16,39 +16,146 @@ use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_options_allocator::prepare_wal_options; use futures_util::future::BoxFuture; use store_api::path_utils::table_dir; -use store_api::region_request::{PathType, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{PathType, RegionOpenRequest}; use crate::heartbeat::handler::HandlerContext; impl HandlerContext { - pub(crate) fn handle_open_region_instruction( + pub(crate) fn handle_open_regions_instruction( self, - OpenRegion { - region_ident, - region_storage_path, - mut region_options, - region_wal_options, - skip_wal_replay, - }: OpenRegion, + open_regions: Vec, + open_region_parallelism: usize, ) -> BoxFuture<'static, Option> { Box::pin(async move { - let region_id = Self::region_ident_to_region_id(®ion_ident); - prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); - let request = RegionRequest::Open(RegionOpenRequest { - engine: region_ident.engine, - table_dir: table_dir(®ion_storage_path, region_id.table_id()), - path_type: PathType::Bare, - options: region_options, - skip_wal_replay, - checkpoint: None, - }); - let result = self.region_server.handle_request(region_id, request).await; + let requests = open_regions + .into_iter() + .map(|open_region| { + let OpenRegion { + region_ident, + region_storage_path, + mut region_options, + region_wal_options, + skip_wal_replay, + } = open_region; + let region_id = Self::region_ident_to_region_id(®ion_ident); + prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); + let request = RegionOpenRequest { + engine: region_ident.engine, + table_dir: table_dir(®ion_storage_path, region_id.table_id()), + path_type: PathType::Bare, + options: region_options, + skip_wal_replay, + checkpoint: None, + }; + (region_id, request) + }) + .collect::>(); + + let result = self + .region_server + .handle_batch_open_requests(open_region_parallelism, requests, false) + .await; let success = result.is_ok(); let error = result.as_ref().map_err(|e| format!("{e:?}")).err(); - Some(InstructionReply::OpenRegion(SimpleReply { + Some(InstructionReply::OpenRegions(SimpleReply { result: success, error, })) }) } } + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::RegionIdent; + use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; + use common_meta::heartbeat::mailbox::MessageMeta; + use common_meta::instruction::{Instruction, OpenRegion}; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::path_utils::table_dir; + use store_api::region_request::{RegionCloseRequest, RegionRequest}; + use store_api::storage::RegionId; + + use crate::heartbeat::handler::RegionHeartbeatResponseHandler; + use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv; + use crate::tests::mock_region_server; + + fn open_regions_instruction( + region_ids: impl IntoIterator, + storage_path: &str, + ) -> Instruction { + let region_idents = region_ids + .into_iter() + .map(|region_id| OpenRegion { + region_ident: RegionIdent { + datanode_id: 0, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + engine: MITO_ENGINE_NAME.to_string(), + }, + region_storage_path: storage_path.to_string(), + region_options: HashMap::new(), + region_wal_options: HashMap::new(), + skip_wal_replay: false, + }) + .collect(); + + Instruction::OpenRegions(region_idents) + } + + #[tokio::test] + async fn test_open_regions() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let mut engine_env = TestEnv::with_prefix("open-regions").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + let region_id = RegionId::new(1024, 1); + let region_id1 = RegionId::new(1024, 2); + let storage_path = "test"; + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + create_req.table_dir = table_dir(storage_path, region_id.table_id()); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + let mut create_req1 = builder.build(); + create_req1.table_dir = table_dir(storage_path, region_id1.table_id()); + region_server + .handle_request(region_id1, RegionRequest::Create(create_req1)) + .await + .unwrap(); + region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + region_server + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + .unwrap(); + + let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); + let instruction = open_regions_instruction([region_id, region_id1], storage_path); + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Continue); + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + + let reply = reply.expect_open_regions_reply(); + assert!(reply.result); + assert!(reply.error.is_none()); + + assert!(engine.is_region_exists(region_id)); + assert!(engine.is_region_exists(region_id1)); + } +} diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index 3f488a9f86..5a8beb7ca4 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -80,12 +80,12 @@ impl CloseDowngradedRegion { let RegionInfo { engine, .. } = datanode_table_value.region_info.clone(); - Ok(Instruction::CloseRegion(RegionIdent { + Ok(Instruction::CloseRegions(vec![RegionIdent { datanode_id: downgrade_leader_datanode_id, table_id, region_number, engine, - })) + }])) } /// Closes the downgraded leader region. @@ -121,7 +121,7 @@ impl CloseDowngradedRegion { "Received close downgraded leade region reply: {:?}, region: {}", reply, region_id ); - let InstructionReply::CloseRegion(SimpleReply { result, error }) = reply else { + let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), reason: "expect close region reply", diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 7f5098aa6b..f3e767cfd9 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -78,7 +78,7 @@ impl OpenCandidateRegion { engine, } = datanode_table_value.region_info.clone(); - let open_instruction = Instruction::OpenRegion(OpenRegion::new( + let open_instruction = Instruction::OpenRegions(vec![OpenRegion::new( RegionIdent { datanode_id: candidate_id, table_id, @@ -89,7 +89,7 @@ impl OpenCandidateRegion { region_options, region_wal_options, true, - )); + )]); Ok(open_instruction) } @@ -155,7 +155,7 @@ impl OpenCandidateRegion { region_id, now.elapsed() ); - let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else { + let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), reason: "expect open region reply", @@ -215,7 +215,7 @@ mod tests { } fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction { - Instruction::OpenRegion(OpenRegion { + Instruction::OpenRegions(vec![OpenRegion { region_ident: RegionIdent { datanode_id, table_id: region_id.table_id(), @@ -226,7 +226,7 @@ mod tests { region_options: Default::default(), region_wal_options: Default::default(), skip_wal_replay: true, - }) + }]) } #[tokio::test] diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index d822ae845c..8197087351 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -95,8 +95,11 @@ pub fn new_open_region_reply(id: u64, result: bool, error: Option) -> Ma to: "meta".to_string(), timestamp_millis: current_time_millis(), payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::OpenRegion(SimpleReply { result, error })) - .unwrap(), + serde_json::to_string(&InstructionReply::OpenRegions(SimpleReply { + result, + error, + })) + .unwrap(), )), } } @@ -157,7 +160,7 @@ pub fn new_close_region_reply(id: u64) -> MailboxMessage { to: "meta".to_string(), timestamp_millis: current_time_millis(), payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::CloseRegion(SimpleReply { + serde_json::to_string(&InstructionReply::CloseRegions(SimpleReply { result: false, error: None, }))