From 6960a0183ae75f7526cdc5e1b803e22382804db5 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 31 Oct 2025 11:08:38 +0800 Subject: [PATCH] refactor: add support for batch region upgrade operations part1 (#7155) * refactor: convert UpgradeRegion instruction to batch operation Signed-off-by: WenyXu * feat: introduce `handle_batch_catchup_requests` fn for mito engine Signed-off-by: WenyXu * test: add tests Signed-off-by: WenyXu * feat: introduce `handle_batch_catchup_requests` fn for metric engine Signed-off-by: WenyXu * chore: suggestion and add ser/de tests Signed-off-by: WenyXu * chore: add comments Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/instruction.rs | 145 ++++++++- src/datanode/src/heartbeat/handler.rs | 8 +- .../src/heartbeat/handler/upgrade_region.rs | 287 ++++++++++-------- .../upgrade_candidate_region.rs | 21 +- src/meta-srv/src/procedure/test_util.rs | 15 +- src/metric-engine/src/engine.rs | 15 +- src/metric-engine/src/engine/catchup.rs | 97 +++++- src/metric-engine/src/engine/open.rs | 17 +- src/metric-engine/src/error.rs | 10 +- src/mito2/src/engine.rs | 147 ++++++++- src/mito2/src/engine/batch_catchup_test.rs | 239 +++++++++++++++ src/mito2/src/request.rs | 20 +- src/mito2/src/worker.rs | 5 +- src/mito2/src/worker/handle_catchup.rs | 7 +- src/store-api/src/logstore/provider.rs | 4 +- src/store-api/src/region_engine.rs | 28 +- 16 files changed, 887 insertions(+), 178 deletions(-) create mode 100644 src/mito2/src/engine/batch_catchup_test.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d8e5affe30..e25b0e350d 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -507,13 +507,14 @@ pub enum Instruction { /// Closes regions. #[serde(deserialize_with = "single_or_multiple_from", alias = "CloseRegion")] CloseRegions(Vec), - /// Upgrades a region. - UpgradeRegion(UpgradeRegion), + /// Upgrades regions. + #[serde(deserialize_with = "single_or_multiple_from", alias = "UpgradeRegion")] + UpgradeRegions(Vec), #[serde( deserialize_with = "single_or_multiple_from", alias = "DowngradeRegion" )] - /// Downgrades a region. + /// Downgrades regions. DowngradeRegions(Vec), /// Invalidates batch cache. InvalidateCaches(Vec), @@ -559,9 +560,9 @@ impl Instruction { } /// Converts the instruction into a [UpgradeRegion]. - pub fn into_upgrade_regions(self) -> Option { + pub fn into_upgrade_regions(self) -> Option> { match self { - Self::UpgradeRegion(upgrade_region) => Some(upgrade_region), + Self::UpgradeRegions(upgrade_region) => Some(upgrade_region), _ => None, } } @@ -584,6 +585,10 @@ impl Instruction { /// The reply of [UpgradeRegion]. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct UpgradeRegionReply { + /// The [RegionId]. + /// For compatibility, it is defaulted to [RegionId::new(0, 0)]. + #[serde(default)] + pub region_id: RegionId, /// Returns true if `last_entry_id` has been replayed to the latest. pub ready: bool, /// Indicates whether the region exists. @@ -635,6 +640,39 @@ where }) } +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct UpgradeRegionsReply { + pub replies: Vec, +} + +impl UpgradeRegionsReply { + pub fn new(replies: Vec) -> Self { + Self { replies } + } + + pub fn single(reply: UpgradeRegionReply) -> Self { + Self::new(vec![reply]) + } +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum UpgradeRegionsCompat { + Single(UpgradeRegionReply), + Multiple(UpgradeRegionsReply), +} + +fn upgrade_regions_compat_from<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let helper = UpgradeRegionsCompat::deserialize(deserializer)?; + Ok(match helper { + UpgradeRegionsCompat::Single(x) => UpgradeRegionsReply::new(vec![x]), + UpgradeRegionsCompat::Multiple(reply) => reply, + }) +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { @@ -642,7 +680,11 @@ pub enum InstructionReply { OpenRegions(SimpleReply), #[serde(alias = "close_region")] CloseRegions(SimpleReply), - UpgradeRegion(UpgradeRegionReply), + #[serde( + deserialize_with = "upgrade_regions_compat_from", + alias = "upgrade_region" + )] + UpgradeRegions(UpgradeRegionsReply), #[serde( alias = "downgrade_region", deserialize_with = "downgrade_regions_compat_from" @@ -658,9 +700,11 @@ impl Display for InstructionReply { match self { Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply), Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply), - Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply), + Self::UpgradeRegions(reply) => { + write!(f, "InstructionReply::UpgradeRegions({:?})", reply.replies) + } Self::DowngradeRegions(reply) => { - write!(f, "InstructionReply::DowngradeRegions({:?})", reply) + write!(f, "InstructionReply::DowngradeRegions({:?})", reply.replies) } Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply), Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply), @@ -685,9 +729,9 @@ impl InstructionReply { } } - pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply { + pub fn expect_upgrade_regions_reply(self) -> Vec { match self { - Self::UpgradeRegion(reply) => reply, + Self::UpgradeRegions(reply) => reply.replies, _ => panic!("Expected UpgradeRegion reply"), } } @@ -749,25 +793,58 @@ mod tests { serialized ); - let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single( - DowngradeRegionReply { + let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion { + region_id: RegionId::new(1024, 1), + last_entry_id: None, + metadata_last_entry_id: None, + replay_timeout: Some(Duration::from_millis(1000)), + location_id: None, + replay_entry_id: None, + metadata_replay_entry_id: None, + }]); + + let serialized = serde_json::to_string(&upgrade_region).unwrap(); + assert_eq!( + r#"{"UpgradeRegions":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null}]}"#, + serialized + ); + } + + #[test] + fn test_serialize_instruction_reply() { + let downgrade_region_reply = InstructionReply::DowngradeRegions( + DowngradeRegionsReply::single(DowngradeRegionReply { region_id: RegionId::new(1024, 1), last_entry_id: None, metadata_last_entry_id: None, exists: true, error: None, - }, - )); + }), + ); - let serialized = serde_json::to_string(&downgrade_region).unwrap(); + let serialized = serde_json::to_string(&downgrade_region_reply).unwrap(); assert_eq!( r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#, serialized - ) + ); + + let upgrade_region_reply = + InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply { + region_id: RegionId::new(1024, 1), + ready: true, + exists: true, + error: None, + })); + let serialized = serde_json::to_string(&upgrade_region_reply).unwrap(); + assert_eq!( + r#"{"type":"upgrade_regions","replies":[{"region_id":4398046511105,"ready":true,"exists":true,"error":null}]}"#, + serialized + ); } #[test] fn test_deserialize_instruction() { + // legacy open region instruction let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#; let open_region_instruction: Instruction = serde_json::from_str(open_region_instruction).unwrap(); @@ -785,6 +862,7 @@ mod tests { )]); assert_eq!(open_region_instruction, open_region); + // legacy close region instruction let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#; let close_region_instruction: Instruction = serde_json::from_str(close_region_instruction).unwrap(); @@ -796,6 +874,7 @@ mod tests { }]); assert_eq!(close_region_instruction, close_region); + // legacy downgrade region instruction let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#; let downgrade_region_instruction: Instruction = serde_json::from_str(downgrade_region_instruction).unwrap(); @@ -805,6 +884,25 @@ mod tests { }]); assert_eq!(downgrade_region_instruction, downgrade_region); + // legacy upgrade region instruction + let upgrade_region_instruction = r#"{"UpgradeRegion":{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"replay_timeout":"1s","location_id":null,"replay_entry_id":null,"metadata_replay_entry_id":null}}"#; + let upgrade_region_instruction: Instruction = + serde_json::from_str(upgrade_region_instruction).unwrap(); + let upgrade_region = Instruction::UpgradeRegions(vec![UpgradeRegion { + region_id: RegionId::new(1024, 1), + last_entry_id: None, + metadata_last_entry_id: None, + replay_timeout: Some(Duration::from_millis(1000)), + location_id: None, + replay_entry_id: None, + metadata_replay_entry_id: None, + }]); + assert_eq!(upgrade_region_instruction, upgrade_region); + } + + #[test] + fn test_deserialize_instruction_reply() { + // legacy close region reply let close_region_instruction_reply = r#"{"result":true,"error":null,"type":"close_region"}"#; let close_region_instruction_reply: InstructionReply = @@ -815,6 +913,7 @@ mod tests { }); assert_eq!(close_region_instruction_reply, close_region_reply); + // legacy open region reply let open_region_instruction_reply = r#"{"result":true,"error":null,"type":"open_region"}"#; let open_region_instruction_reply: InstructionReply = serde_json::from_str(open_region_instruction_reply).unwrap(); @@ -824,6 +923,7 @@ mod tests { }); assert_eq!(open_region_instruction_reply, open_region_reply); + // legacy downgrade region reply let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#; let downgrade_region_instruction_reply: InstructionReply = serde_json::from_str(downgrade_region_instruction_reply).unwrap(); @@ -837,6 +937,19 @@ mod tests { }), ); assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply); + + // legacy upgrade region reply + let upgrade_region_instruction_reply = r#"{"region_id":4398046511105,"ready":true,"exists":true,"error":null,"type":"upgrade_region"}"#; + let upgrade_region_instruction_reply: InstructionReply = + serde_json::from_str(upgrade_region_instruction_reply).unwrap(); + let upgrade_region_reply = + InstructionReply::UpgradeRegions(UpgradeRegionsReply::single(UpgradeRegionReply { + region_id: RegionId::new(1024, 1), + ready: true, + exists: true, + error: None, + })); + assert_eq!(upgrade_region_instruction_reply, upgrade_region_reply); } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 8573314b82..bb81fc1c6b 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -114,7 +114,7 @@ impl RegionHeartbeatResponseHandler { )), Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())), Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())), - Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler.into())), + Instruction::UpgradeRegions(_) => Ok(Box::new(UpgradeRegionsHandler.into())), Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())), Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), @@ -194,7 +194,7 @@ dispatch_instr!( OpenRegions => OpenRegions, FlushRegions => FlushRegions, DowngradeRegions => DowngradeRegions, - UpgradeRegion => UpgradeRegions, + UpgradeRegions => UpgradeRegions, GetFileRefs => GetFileRefs, GcRegions => GcRegions, ); @@ -334,10 +334,10 @@ mod tests { ); // Upgrade region - let instruction = Instruction::UpgradeRegion(UpgradeRegion { + let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion { region_id, ..Default::default() - }); + }]); assert!( heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) ); diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index 06769dcb77..4271a7ed30 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_meta::instruction::{ + InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply, +}; use common_telemetry::{info, warn}; +use futures::future::join_all; use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint}; use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; @@ -22,115 +25,151 @@ use crate::heartbeat::task_tracker::WaitResult; #[derive(Debug, Clone, Copy, Default)] pub struct UpgradeRegionsHandler; +impl UpgradeRegionsHandler { + // Handles upgrade regions instruction. + // + // Returns batch of upgrade region replies, the order of the replies is not guaranteed. + async fn handle_upgrade_regions( + &self, + ctx: &HandlerContext, + upgrade_regions: Vec, + ) -> Vec { + let mut replies = Vec::with_capacity(upgrade_regions.len()); + let mut catchup_request = Vec::with_capacity(upgrade_regions.len()); + + for upgrade_region in upgrade_regions { + let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id) + else { + replies.push(UpgradeRegionReply { + region_id: upgrade_region.region_id, + ready: false, + exists: false, + error: None, + }); + continue; + }; + + if writable { + replies.push(UpgradeRegionReply { + region_id: upgrade_region.region_id, + ready: true, + exists: true, + error: None, + }); + } else { + let UpgradeRegion { + last_entry_id, + metadata_last_entry_id, + location_id, + replay_entry_id, + metadata_replay_entry_id, + replay_timeout, + .. + } = upgrade_region; + + let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { + (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { + entry_id, + metadata_entry_id, + }), + _ => None, + }; + + catchup_request.push(( + upgrade_region.region_id, + replay_timeout.unwrap_or_default(), + RegionCatchupRequest { + set_writable: true, + entry_id: last_entry_id, + metadata_entry_id: metadata_last_entry_id, + location_id, + checkpoint, + }, + )); + } + } + + let mut wait_results = Vec::with_capacity(catchup_request.len()); + + for (region_id, replay_timeout, catchup_request) in catchup_request { + let region_server_moved = ctx.region_server.clone(); + // TODO(weny): parallelize the catchup tasks. + let result = ctx + .catchup_tasks + .try_register( + region_id, + Box::pin(async move { + info!( + "Executing region: {region_id} catchup to: last entry id {:?}", + catchup_request.entry_id + ); + region_server_moved + .handle_request(region_id, RegionRequest::Catchup(catchup_request)) + .await?; + Ok(()) + }), + ) + .await; + + if result.is_busy() { + warn!("Another catchup task is running for the region: {region_id}"); + } + + // We don't care that it returns a newly registered or running task. + let mut watcher = result.into_watcher(); + wait_results.push(( + region_id, + ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await, + )); + } + + let results = join_all( + wait_results + .into_iter() + .map(|(region_id, result)| async move { + match result { + WaitResult::Timeout => UpgradeRegionReply { + region_id, + ready: false, + exists: true, + error: None, + }, + WaitResult::Finish(Ok(_)) => UpgradeRegionReply { + region_id, + ready: true, + exists: true, + error: None, + }, + WaitResult::Finish(Err(err)) => UpgradeRegionReply { + region_id, + ready: false, + exists: true, + error: Some(format!("{err:?}")), + }, + } + }), + ) + .await; + + replies.extend(results.into_iter()); + replies + } +} + #[async_trait::async_trait] impl InstructionHandler for UpgradeRegionsHandler { - type Instruction = UpgradeRegion; + type Instruction = Vec; async fn handle( &self, ctx: &HandlerContext, - UpgradeRegion { - region_id, - last_entry_id, - metadata_last_entry_id, - replay_timeout, - location_id, - replay_entry_id, - metadata_replay_entry_id, - }: UpgradeRegion, + upgrade_regions: Self::Instruction, ) -> Option { - let Some(writable) = ctx.region_server.is_region_leader(region_id) else { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: false, - error: None, - })); - }; + let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await; - if writable { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: true, - exists: true, - error: None, - })); - } - - let region_server_moved = ctx.region_server.clone(); - - let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { - (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { - entry_id, - metadata_entry_id, - }), - _ => None, - }; - - // The catchup task is almost zero cost if the inside region is writable. - // Therefore, it always registers a new catchup task. - let register_result = ctx - .catchup_tasks - .try_register( - region_id, - Box::pin(async move { - info!( - "Executing region: {region_id} catchup to: last entry id {last_entry_id:?}" - ); - region_server_moved - .handle_request( - region_id, - RegionRequest::Catchup(RegionCatchupRequest { - set_writable: true, - entry_id: last_entry_id, - metadata_entry_id: metadata_last_entry_id, - location_id, - checkpoint, - }), - ) - .await?; - - Ok(()) - }), - ) - .await; - - if register_result.is_busy() { - warn!("Another catchup task is running for the region: {region_id}"); - } - - // Returns immediately - let Some(replay_timeout) = replay_timeout else { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: true, - error: None, - })); - }; - - // We don't care that it returns a newly registered or running task. - let mut watcher = register_result.into_watcher(); - let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await; - - match result { - WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: true, - error: None, - })), - WaitResult::Finish(Ok(_)) => { - Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: true, - exists: true, - error: None, - })) - } - WaitResult::Finish(Err(err)) => { - Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: true, - error: Some(format!("{err:?}")), - })) - } - } + Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new( + replies, + ))) } } @@ -164,15 +203,15 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout, ..Default::default() - }, + }], ) .await; - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.exists); assert!(reply.error.is_none()); } @@ -201,15 +240,15 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout, ..Default::default() - }, + }], ) .await; - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -239,15 +278,15 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout, ..Default::default() - }, + }], ) .await; - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -280,15 +319,15 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout, ..Default::default() - }, + }], ) .await; - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -298,17 +337,17 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout: Some(Duration::from_millis(500)), ..Default::default() - }, + }], ) .await; // Must less than 300 ms. assert!(timer.elapsed().as_millis() < 300); - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -339,15 +378,15 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, ..Default::default() - }, + }], ) .await; // It didn't wait for handle returns; it had no idea about the error. - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); @@ -355,18 +394,18 @@ mod tests { let reply = UpgradeRegionsHandler .handle( &handler_context, - UpgradeRegion { + vec![UpgradeRegion { region_id, replay_timeout: Some(Duration::from_millis(200)), ..Default::default() - }, + }], ) .await; - let reply = reply.unwrap().expect_upgrade_region_reply(); + let reply = &reply.unwrap().expect_upgrade_regions_reply()[0]; assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_some()); - assert!(reply.error.unwrap().contains("mock_error")); + assert!(reply.error.as_ref().unwrap().contains("mock_error")); } } diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 2dfaa21a89..7c3b3e4e19 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -17,7 +17,9 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::ddl::utils::parse_region_wal_options; -use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_meta::instruction::{ + Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply, +}; use common_meta::lock_key::RemoteWalLock; use common_meta::wal_options_allocator::extract_topic_from_wal_options; use common_procedure::{Context as ProcedureContext, Status}; @@ -131,7 +133,7 @@ impl UpgradeCandidateRegion { None }; - let upgrade_instruction = Instruction::UpgradeRegion( + let upgrade_instruction = Instruction::UpgradeRegions(vec![ UpgradeRegion { region_id, last_entry_id, @@ -143,7 +145,7 @@ impl UpgradeCandidateRegion { } .with_replay_entry_id(checkpoint.map(|c| c.entry_id)) .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)), - ); + ]); Ok(upgrade_instruction) } @@ -193,11 +195,7 @@ impl UpgradeCandidateRegion { match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; - let InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready, - exists, - error, - }) = reply + let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), @@ -205,6 +203,13 @@ impl UpgradeCandidateRegion { } .fail(); }; + // TODO(weny): handle multiple replies. + let UpgradeRegionReply { + ready, + exists, + error, + .. + } = &replies[0]; // Notes: The order of handling is important. if error.is_some() { diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 247f112514..1586ad5f5f 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -18,7 +18,7 @@ use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply, - UpgradeRegionReply, + UpgradeRegionReply, UpgradeRegionsReply, }; use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::TableRouteValue; @@ -212,11 +212,14 @@ pub fn new_upgrade_region_reply( to: "meta".to_string(), timestamp_millis: current_time_millis(), payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready, - exists, - error, - })) + serde_json::to_string(&InstructionReply::UpgradeRegions( + UpgradeRegionsReply::single(UpgradeRegionReply { + region_id: RegionId::new(0, 0), + ready, + exists, + error, + }), + )) .unwrap(), )), } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index eabd98b135..6ba403e4c8 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -46,7 +46,9 @@ use store_api::region_engine::{ RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, SettableRegionRoleState, SyncManifestResponse, }; -use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{ + BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, +}; use store_api::storage::{RegionId, ScanRequest, SequenceNumber}; use crate::config::EngineConfig; @@ -142,6 +144,17 @@ impl RegionEngine for MetricEngine { .map_err(BoxedError::new) } + async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result { + self.inner + .handle_batch_catchup_requests(parallelism, requests) + .await + .map_err(BoxedError::new) + } + async fn handle_batch_ddl_requests( &self, batch_request: BatchRegionDdlRequest, diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs index 6ae4560228..d6544f6c9e 100644 --- a/src/metric-engine/src/engine/catchup.rs +++ b/src/metric-engine/src/engine/catchup.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + +use common_error::ext::BoxedError; use common_telemetry::debug; use snafu::{OptionExt, ResultExt}; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{BatchResponses, RegionEngine}; use store_api::region_request::{ AffectedRows, RegionCatchupRequest, RegionRequest, ReplayCheckpoint, }; @@ -22,11 +25,101 @@ use store_api::storage::RegionId; use crate::engine::MetricEngineInner; use crate::error::{ - MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu, + BatchCatchupMitoRegionSnafu, MitoCatchupOperationSnafu, PhysicalRegionNotFoundSnafu, Result, + UnsupportedRegionRequestSnafu, }; use crate::utils; impl MetricEngineInner { + pub async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result { + let mut all_requests = Vec::with_capacity(requests.len() * 2); + let mut physical_region_options_list = Vec::with_capacity(requests.len()); + + for (region_id, req) in requests { + let metadata_region_id = utils::to_metadata_region_id(region_id); + let data_region_id = utils::to_data_region_id(region_id); + + let physical_region_options = *self + .state + .read() + .unwrap() + .physical_region_states() + .get(&data_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .options(); + physical_region_options_list.push((data_region_id, physical_region_options)); + all_requests.push(( + metadata_region_id, + RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: req.metadata_entry_id, + metadata_entry_id: None, + location_id: req.location_id, + checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { + entry_id: c.metadata_entry_id.unwrap_or_default(), + metadata_entry_id: None, + }), + }, + )); + all_requests.push(( + data_region_id, + RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: req.entry_id, + metadata_entry_id: None, + location_id: req.location_id, + checkpoint: req.checkpoint.map(|c| ReplayCheckpoint { + entry_id: c.entry_id, + metadata_entry_id: None, + }), + }, + )); + } + + let mut results = self + .mito + .handle_batch_catchup_requests(parallelism, all_requests) + .await + .context(BatchCatchupMitoRegionSnafu {})? + .into_iter() + .collect::>(); + + let mut responses = Vec::with_capacity(physical_region_options_list.len()); + for (physical_region_id, physical_region_options) in physical_region_options_list { + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); + let data_region_id = utils::to_data_region_id(physical_region_id); + let metadata_region_result = results.remove(&metadata_region_id); + let data_region_result = results.remove(&data_region_id); + + // Pass the optional `metadata_region_result` and `data_region_result` to + // `recover_physical_region_with_results`. This function handles errors for each + // catchup physical region request, allowing the process to continue with the + // remaining regions even if some requests fail. + let response = self + .recover_physical_region_with_results( + metadata_region_result, + data_region_result, + physical_region_id, + physical_region_options, + // Note: We intentionally don’t close the region if recovery fails. + // Closing it here might confuse the region server since it links RegionIds to Engines. + // If recovery didn’t succeed, the region should stay open. + false, + ) + .await + .map_err(BoxedError::new); + responses.push((physical_region_id, response)); + } + + Ok(responses) + } + pub async fn catchup_region( &self, region_id: RegionId, diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 895bb9ed14..afbc9c9772 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -72,17 +72,19 @@ impl MetricEngineInner { let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let data_region_id = utils::to_data_region_id(physical_region_id); let metadata_region_result = results.remove(&metadata_region_id); - let data_region_result = results.remove(&data_region_id); + let data_region_result: Option> = + results.remove(&data_region_id); // Pass the optional `metadata_region_result` and `data_region_result` to - // `open_physical_region_with_results`. This function handles errors for each + // `recover_physical_region_with_results`. This function handles errors for each // open physical region request, allowing the process to continue with the // remaining regions even if some requests fail. let response = self - .open_physical_region_with_results( + .recover_physical_region_with_results( metadata_region_result, data_region_result, physical_region_id, physical_region_options, + true, ) .await .map_err(BoxedError::new); @@ -107,12 +109,13 @@ impl MetricEngineInner { } } - async fn open_physical_region_with_results( + pub(crate) async fn recover_physical_region_with_results( &self, metadata_region_result: Option>, data_region_result: Option>, physical_region_id: RegionId, physical_region_options: PhysicalRegionOptions, + close_region_on_failure: bool, ) -> Result { let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let data_region_id = utils::to_data_region_id(physical_region_id); @@ -136,8 +139,10 @@ impl MetricEngineInner { .recover_states(physical_region_id, physical_region_options) .await { - self.close_physical_region_on_recovery_failure(physical_region_id) - .await; + if close_region_on_failure { + self.close_physical_region_on_recovery_failure(physical_region_id) + .await; + } return Err(err); } Ok(data_region_response) diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 91881b5624..d5185bf4e4 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -50,6 +50,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to batch catchup mito region"))] + BatchCatchupMitoRegion { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("No open region result for region {}", region_id))] NoOpenRegionResult { region_id: RegionId, @@ -361,7 +368,8 @@ impl ErrorExt for Error { | MitoFlushOperation { source, .. } | MitoDeleteOperation { source, .. } | MitoSyncOperation { source, .. } - | BatchOpenMitoRegion { source, .. } => source.status_code(), + | BatchOpenMitoRegion { source, .. } + | BatchCatchupMitoRegion { source, .. } => source.status_code(), EncodePrimaryKey { source, .. } => source.status_code(), diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 42033e02e1..11871f09d5 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,6 +21,8 @@ mod append_mode_test; #[cfg(test)] mod basic_test; #[cfg(test)] +mod batch_catchup_test; +#[cfg(test)] mod batch_open_test; #[cfg(test)] mod bump_committed_sequence_test; @@ -91,7 +93,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::ManifestVersion; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::LogStore; -use store_api::logstore::provider::Provider; +use store_api::logstore::provider::{KafkaProvider, Provider}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::metric_engine_consts::{ MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY, @@ -100,7 +102,9 @@ use store_api::region_engine::{ BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse, }; -use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; +use store_api::region_request::{ + AffectedRows, RegionCatchupRequest, RegionOpenRequest, RegionRequest, +}; use store_api::sst_entry::{ManifestSstEntry, PuffinIndexMetaEntry, StorageSstEntry}; use store_api::storage::{FileId, FileRefsManifest, RegionId, ScanRequest, SequenceNumber}; use tokio::sync::{Semaphore, oneshot}; @@ -772,6 +776,122 @@ impl EngineInner { Ok(responses) } + async fn catchup_topic_regions( + &self, + provider: Provider, + region_requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result)>> { + let now = Instant::now(); + let region_ids = region_requests + .iter() + .map(|(region_id, _)| *region_id) + .collect::>(); + let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers( + provider.clone(), + self.wal_raw_entry_reader.clone(), + ®ion_ids, + DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE, + ); + + let mut responses = Vec::with_capacity(region_requests.len()); + for ((region_id, request), entry_receiver) in + region_requests.into_iter().zip(entry_receivers) + { + let (request, receiver) = + WorkerRequest::new_catchup_region_request(region_id, request, Some(entry_receiver)); + self.workers.submit_to_worker(region_id, request).await?; + responses.push(async move { receiver.await.context(RecvSnafu)? }); + } + + // Wait for entries distribution. + let distribution = + common_runtime::spawn_global(async move { distributor.distribute().await }); + // Wait for worker returns. + let responses = join_all(responses).await; + distribution.await.context(JoinSnafu)??; + + let num_failure = responses.iter().filter(|r| r.is_err()).count(); + info!( + "Caught up {} regions for topic '{}', failures: {}, elapsed: {:?}", + region_ids.len() - num_failure, + // Safety: provider is kafka provider. + provider.as_kafka_provider().unwrap(), + num_failure, + now.elapsed(), + ); + + Ok(region_ids.into_iter().zip(responses).collect()) + } + + async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result)>> { + let mut responses = Vec::with_capacity(requests.len()); + let mut topic_regions: HashMap, Vec<_>> = HashMap::new(); + let mut remaining_region_requests = vec![]; + + for (region_id, request) in requests { + match self.workers.get_region(region_id) { + Some(region) => match region.provider.as_kafka_provider() { + Some(provider) => { + topic_regions + .entry(provider.clone()) + .or_default() + .push((region_id, request)); + } + None => { + remaining_region_requests.push((region_id, request)); + } + }, + None => responses.push((region_id, RegionNotFoundSnafu { region_id }.fail())), + } + } + + let semaphore = Arc::new(Semaphore::new(parallelism)); + + if !topic_regions.is_empty() { + let mut tasks = Vec::with_capacity(topic_regions.len()); + for (provider, region_requests) in topic_regions { + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + self.catchup_topic_regions(Provider::Kafka(provider), region_requests) + .await + }) + } + + let r = try_join_all(tasks).await?; + responses.extend(r.into_iter().flatten()); + } + + if !remaining_region_requests.is_empty() { + let mut tasks = Vec::with_capacity(remaining_region_requests.len()); + let mut region_ids = Vec::with_capacity(remaining_region_requests.len()); + for (region_id, request) in remaining_region_requests { + let semaphore_moved = semaphore.clone(); + region_ids.push(region_id); + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + let (request, receiver) = + WorkerRequest::new_catchup_region_request(region_id, request, None); + + self.workers.submit_to_worker(region_id, request).await?; + + receiver.await.context(RecvSnafu)? + }) + } + + let results = join_all(tasks).await; + responses.extend(region_ids.into_iter().zip(results)); + } + + Ok(responses) + } + /// Handles [RegionRequest] and return its executed result. async fn handle_request( &self, @@ -914,6 +1034,29 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } + #[tracing::instrument(skip_all)] + async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result { + self.inner + .handle_batch_catchup_requests(parallelism, requests) + .await + .map(|responses| { + responses + .into_iter() + .map(|(region_id, response)| { + ( + region_id, + response.map(RegionResponse::new).map_err(BoxedError::new), + ) + }) + .collect::>() + }) + .map_err(BoxedError::new) + } + #[tracing::instrument(skip_all)] async fn handle_request( &self, diff --git a/src/mito2/src/engine/batch_catchup_test.rs b/src/mito2/src/engine/batch_catchup_test.rs new file mode 100644 index 0000000000..6f641abe26 --- /dev/null +++ b/src/mito2/src/engine/batch_catchup_test.rs @@ -0,0 +1,239 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::Rows; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_recordbatch::RecordBatches; +use common_wal::options::{KafkaWalOptions, WAL_OPTIONS_KEY, WalOptions}; +use rstest::rstest; +use rstest_reuse::apply; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{PathType, RegionCatchupRequest, RegionOpenRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::engine::MitoEngine; +use crate::test_util::{ + CreateRequestBuilder, LogStoreFactory, TestEnv, build_rows, flush_region, + kafka_log_store_factory, prepare_test_for_kafka_log_store, put_rows, rows_schema, + single_kafka_log_store_factory, +}; + +#[apply(single_kafka_log_store_factory)] +async fn test_batch_catchup(factory: Option) { + test_batch_catchup_with_format(factory.clone(), false).await; + test_batch_catchup_with_format(factory, true).await; +} + +async fn test_batch_catchup_with_format(factory: Option, flat_format: bool) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + let mut env = TestEnv::with_prefix("catchup-batch-regions") + .await + .with_log_store_factory(factory.clone()); + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let topic = prepare_test_for_kafka_log_store(&factory).await; + + // FIXME(weny): change region number to 3. + let num_regions = 2u32; + let table_dir_fn = |region_id| format!("test/{region_id}"); + let mut region_schema = HashMap::new(); + + for id in 1..=num_regions { + let engine = engine.clone(); + let topic = topic.clone(); + let region_id = RegionId::new(1, id); + let request = CreateRequestBuilder::new() + .table_dir(&table_dir_fn(region_id)) + .kafka_topic(topic.clone()) + .build(); + let column_schemas = rows_schema(&request); + region_schema.insert(region_id, column_schemas); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + } + + for i in 0..10 { + for region_number in 1..=num_regions { + let region_id = RegionId::new(1, region_number); + let rows = Rows { + schema: region_schema[®ion_id].clone(), + rows: build_rows( + (region_number as usize) * 120 + i as usize, + (region_number as usize) * 120 + i as usize + 1, + ), + }; + put_rows(&engine, region_id, rows).await; + if i % region_number == 0 { + flush_region(&engine, region_id, None).await; + } + } + } + + let assert_result = |engine: MitoEngine| async move { + for i in 1..=num_regions { + let region_id = RegionId::new(1, i); + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let mut expected = String::new(); + expected.push_str( + "+-------+---------+---------------------+\n| tag_0 | field_0 | ts |\n+-------+---------+---------------------+\n", + ); + for row in 0..10 { + expected.push_str(&format!( + "| {} | {}.0 | 1970-01-01T00:{:02}:{:02} |\n", + i * 120 + row, + i * 120 + row, + 2 * i, + row + )); + } + expected.push_str("+-------+---------+---------------------+"); + assert_eq!(expected, batches.pretty_print().unwrap()); + } + }; + assert_result(engine.clone()).await; + + // Reopen engine. + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; + + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.clone(), + })) + .unwrap(), + ); + }; + let requests = (1..=num_regions) + .map(|id| { + let region_id = RegionId::new(1, id); + ( + region_id, + RegionOpenRequest { + engine: String::new(), + table_dir: table_dir_fn(region_id), + options: options.clone(), + skip_wal_replay: true, + path_type: PathType::Bare, + checkpoint: None, + }, + ) + }) + .collect::>(); + let results = engine + .handle_batch_open_requests(4, requests) + .await + .unwrap(); + for (_, result) in results { + assert!(result.is_ok()); + } + + let requests = (1..=num_regions) + .map(|id| { + let region_id = RegionId::new(1, id); + ( + region_id, + RegionCatchupRequest { + set_writable: true, + entry_id: None, + metadata_entry_id: None, + location_id: None, + checkpoint: None, + }, + ) + }) + .collect::>(); + + let results = engine + .handle_batch_catchup_requests(4, requests) + .await + .unwrap(); + for (_, result) in results { + assert!(result.is_ok()); + } + assert_result(engine.clone()).await; +} + +#[apply(single_kafka_log_store_factory)] +async fn test_batch_catchup_err(factory: Option) { + test_batch_catchup_err_with_format(factory.clone(), false).await; + test_batch_catchup_err_with_format(factory, true).await; +} + +async fn test_batch_catchup_err_with_format(factory: Option, flat_format: bool) { + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + let mut env = TestEnv::with_prefix("catchup-regions-err") + .await + .with_log_store_factory(factory.clone()); + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + let num_regions = 3u32; + let requests = (1..num_regions) + .map(|id| { + let region_id = RegionId::new(1, id); + ( + region_id, + RegionCatchupRequest { + set_writable: true, + entry_id: None, + metadata_entry_id: None, + location_id: None, + checkpoint: None, + }, + ) + }) + .collect::>(); + + let results = engine + .handle_batch_catchup_requests(4, requests) + .await + .unwrap(); + for (_, result) in results { + assert_eq!( + result.unwrap_err().status_code(), + StatusCode::RegionNotFound + ); + } +} diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 794576a23c..606fbbb7ea 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -602,6 +602,7 @@ pub(crate) enum WorkerRequest { } impl WorkerRequest { + /// Creates a new open region request. pub(crate) fn new_open_region_request( region_id: RegionId, request: RegionOpenRequest, @@ -618,6 +619,21 @@ impl WorkerRequest { (worker_request, receiver) } + /// Creates a new catchup region request. + pub(crate) fn new_catchup_region_request( + region_id: RegionId, + request: RegionCatchupRequest, + entry_receiver: Option, + ) -> (WorkerRequest, Receiver>) { + let (sender, receiver) = oneshot::channel(); + let worker_request = WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Catchup((request, entry_receiver)), + }); + (worker_request, receiver) + } + /// Converts request from a [RegionRequest]. pub(crate) fn try_from_region_request( region_id: RegionId, @@ -701,7 +717,7 @@ impl WorkerRequest { RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), - request: DdlRequest::Catchup(v), + request: DdlRequest::Catchup((v, None)), }), RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts { metadata: region_metadata, @@ -757,7 +773,7 @@ pub(crate) enum DdlRequest { Compact(RegionCompactRequest), BuildIndex(RegionBuildIndexRequest), Truncate(RegionTruncateRequest), - Catchup(RegionCatchupRequest), + Catchup((RegionCatchupRequest, Option)), } /// Sender and Ddl request. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 4f0b2b9fce..2d442da363 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1027,7 +1027,10 @@ impl RegionWorkerLoop { .await; continue; } - DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await, + DdlRequest::Catchup((req, wal_entry_receiver)) => { + self.handle_catchup_request(ddl.region_id, req, wal_entry_receiver) + .await + } }; ddl.sender.send(res); diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index caabb6ae55..2cbdab75c2 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -28,6 +28,7 @@ use tokio::time::Instant; use crate::error::{self, Result}; use crate::region::MitoRegion; use crate::region::opener::{RegionOpener, replay_memtable}; +use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -35,6 +36,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionCatchupRequest, + entry_receiver: Option, ) -> Result { let Some(region) = self.regions.get_region(region_id) else { return error::RegionNotFoundSnafu { region_id }.fail(); @@ -76,9 +78,10 @@ impl RegionWorkerLoop { region.provider ); let timer = Instant::now(); - let wal_entry_reader = + let wal_entry_reader = entry_receiver.map(|r| Box::new(r) as _).unwrap_or_else(|| { self.wal - .wal_entry_reader(®ion.provider, region_id, request.location_id); + .wal_entry_reader(®ion.provider, region_id, request.location_id) + }); let on_region_opened = self.wal.on_region_opened(); let last_entry_id = replay_memtable( ®ion.provider, diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs index 42c7ef12c3..b6f7c8b9fb 100644 --- a/src/store-api/src/logstore/provider.rs +++ b/src/store-api/src/logstore/provider.rs @@ -42,7 +42,7 @@ impl Display for KafkaProvider { } // The Provider of raft engine log store -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct RaftEngineProvider { pub id: u64, } @@ -59,7 +59,7 @@ impl RaftEngineProvider { } /// The Provider of LogStore -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Provider { RaftEngine(RaftEngineProvider), Kafka(Arc), diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index fe8df673d0..80254fd39c 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -34,7 +34,9 @@ use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest}; +use crate::region_request::{ + BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, +}; use crate::storage::{RegionId, ScanRequest, SequenceNumber}; /// The settable region role state. @@ -715,6 +717,30 @@ pub trait RegionEngine: Send + Sync { Ok(join_all(tasks).await) } + async fn handle_batch_catchup_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionCatchupRequest)>, + ) -> Result { + let semaphore = Arc::new(Semaphore::new(parallelism)); + let mut tasks = Vec::with_capacity(requests.len()); + + for (region_id, request) in requests { + let semaphore_moved = semaphore.clone(); + + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + let result = self + .handle_request(region_id, RegionRequest::Catchup(request)) + .await; + (region_id, result) + }); + } + + Ok(join_all(tasks).await) + } + async fn handle_batch_ddl_requests( &self, request: BatchRegionDdlRequest,