diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 9a9d955f58..c7bd82d675 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -55,6 +55,10 @@ impl Display for RegionIdent { /// The result of downgrade leader region. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct DowngradeRegionReply { + /// The [RegionId]. + /// For compatibility, it is defaulted to [RegionId::new(0, 0)]. + #[serde(default)] + pub region_id: RegionId, /// Returns the `last_entry_id` if available. pub last_entry_id: Option, /// Returns the `metadata_last_entry_id` if available (Only available for metric engine). @@ -423,14 +427,60 @@ pub enum Instruction { CloseRegions(Vec), /// Upgrades a region. UpgradeRegion(UpgradeRegion), + #[serde( + deserialize_with = "single_or_multiple_from", + alias = "DowngradeRegion" + )] /// Downgrades a region. - DowngradeRegion(DowngradeRegion), + DowngradeRegions(Vec), /// Invalidates batch cache. InvalidateCaches(Vec), /// Flushes regions. FlushRegions(FlushRegions), } +impl Instruction { + /// Converts the instruction into a vector of [OpenRegion]. + pub fn into_open_regions(self) -> Option> { + match self { + Self::OpenRegions(open_regions) => Some(open_regions), + _ => None, + } + } + + /// Converts the instruction into a vector of [RegionIdent]. + pub fn into_close_regions(self) -> Option> { + match self { + Self::CloseRegions(close_regions) => Some(close_regions), + _ => None, + } + } + + /// Converts the instruction into a [FlushRegions]. + pub fn into_flush_regions(self) -> Option { + match self { + Self::FlushRegions(flush_regions) => Some(flush_regions), + _ => None, + } + } + + /// Converts the instruction into a [DowngradeRegion]. + pub fn into_downgrade_regions(self) -> Option> { + match self { + Self::DowngradeRegions(downgrade_region) => Some(downgrade_region), + _ => None, + } + } + + /// Converts the instruction into a [UpgradeRegion]. + pub fn into_upgrade_regions(self) -> Option { + match self { + Self::UpgradeRegion(upgrade_region) => Some(upgrade_region), + _ => None, + } + } +} + /// The reply of [UpgradeRegion]. #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct UpgradeRegionReply { @@ -452,6 +502,39 @@ impl Display for UpgradeRegionReply { } } +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct DowngradeRegionsReply { + pub replies: Vec, +} + +impl DowngradeRegionsReply { + pub fn new(replies: Vec) -> Self { + Self { replies } + } + + pub fn single(reply: DowngradeRegionReply) -> Self { + Self::new(vec![reply]) + } +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum DowngradeRegionsCompat { + Single(DowngradeRegionReply), + Multiple(DowngradeRegionsReply), +} + +fn downgrade_regions_compat_from<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let helper = DowngradeRegionsCompat::deserialize(deserializer)?; + Ok(match helper { + DowngradeRegionsCompat::Single(x) => DowngradeRegionsReply::new(vec![x]), + DowngradeRegionsCompat::Multiple(reply) => reply, + }) +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { @@ -460,7 +543,11 @@ pub enum InstructionReply { #[serde(alias = "close_region")] CloseRegions(SimpleReply), UpgradeRegion(UpgradeRegionReply), - DowngradeRegion(DowngradeRegionReply), + #[serde( + alias = "downgrade_region", + deserialize_with = "downgrade_regions_compat_from" + )] + DowngradeRegions(DowngradeRegionsReply), FlushRegions(FlushRegionReply), } @@ -470,8 +557,8 @@ impl Display for InstructionReply { Self::OpenRegions(reply) => write!(f, "InstructionReply::OpenRegions({})", reply), Self::CloseRegions(reply) => write!(f, "InstructionReply::CloseRegions({})", reply), Self::UpgradeRegion(reply) => write!(f, "InstructionReply::UpgradeRegion({})", reply), - Self::DowngradeRegion(reply) => { - write!(f, "InstructionReply::DowngradeRegion({})", reply) + Self::DowngradeRegions(reply) => { + write!(f, "InstructionReply::DowngradeRegions({:?})", reply) } Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply), } @@ -493,6 +580,27 @@ impl InstructionReply { _ => panic!("Expected OpenRegions reply"), } } + + pub fn expect_upgrade_region_reply(self) -> UpgradeRegionReply { + match self { + Self::UpgradeRegion(reply) => reply, + _ => panic!("Expected UpgradeRegion reply"), + } + } + + pub fn expect_downgrade_regions_reply(self) -> Vec { + match self { + Self::DowngradeRegions(reply) => reply.replies, + _ => panic!("Expected DowngradeRegion reply"), + } + } + + pub fn expect_flush_regions_reply(self) -> FlushRegionReply { + match self { + Self::FlushRegions(reply) => reply, + _ => panic!("Expected FlushRegions reply"), + } + } } #[cfg(test)] @@ -532,11 +640,27 @@ mod tests { r#"{"CloseRegions":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#, serialized ); + + let downgrade_region = InstructionReply::DowngradeRegions(DowngradeRegionsReply::single( + DowngradeRegionReply { + region_id: RegionId::new(1024, 1), + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: None, + }, + )); + + let serialized = serde_json::to_string(&downgrade_region).unwrap(); + assert_eq!( + r#"{"type":"downgrade_regions","replies":[{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null}]}"#, + serialized + ) } #[test] fn test_deserialize_instruction() { - let open_region_instruction = r#"{"OpenRegion":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#; + let open_region_instruction = r#"{"OpenRegion":{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}}"#; let open_region_instruction: Instruction = serde_json::from_str(open_region_instruction).unwrap(); let open_region = Instruction::OpenRegions(vec![OpenRegion::new( @@ -553,7 +677,7 @@ mod tests { )]); assert_eq!(open_region_instruction, open_region); - let close_region_instruction = r#"{"CloseRegion":[{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}]}"#; + let close_region_instruction = r#"{"CloseRegion":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#; let close_region_instruction: Instruction = serde_json::from_str(close_region_instruction).unwrap(); let close_region = Instruction::CloseRegions(vec![RegionIdent { @@ -564,6 +688,15 @@ mod tests { }]); assert_eq!(close_region_instruction, close_region); + let downgrade_region_instruction = r#"{"DowngradeRegions":{"region_id":4398046511105,"flush_timeout":{"secs":1,"nanos":0}}}"#; + let downgrade_region_instruction: Instruction = + serde_json::from_str(downgrade_region_instruction).unwrap(); + let downgrade_region = Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id: RegionId::new(1024, 1), + flush_timeout: Some(Duration::from_millis(1000)), + }]); + assert_eq!(downgrade_region_instruction, downgrade_region); + let close_region_instruction_reply = r#"{"result":true,"error":null,"type":"close_region"}"#; let close_region_instruction_reply: InstructionReply = @@ -582,6 +715,20 @@ mod tests { error: None, }); assert_eq!(open_region_instruction_reply, open_region_reply); + + let downgrade_region_instruction_reply = r#"{"region_id":4398046511105,"last_entry_id":null,"metadata_last_entry_id":null,"exists":true,"error":null,"type":"downgrade_region"}"#; + let downgrade_region_instruction_reply: InstructionReply = + serde_json::from_str(downgrade_region_instruction_reply).unwrap(); + let downgrade_region_reply = InstructionReply::DowngradeRegions( + DowngradeRegionsReply::single(DowngradeRegionReply { + region_id: RegionId::new(1024, 1), + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: None, + }), + ); + assert_eq!(downgrade_region_instruction_reply, downgrade_region_reply); } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 14a671a14b..71b3181a04 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -13,16 +13,13 @@ // limitations under the License. use async_trait::async_trait; -use common_meta::RegionIdent; use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult}; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; use common_meta::instruction::{Instruction, InstructionReply}; use common_telemetry::error; -use futures::future::BoxFuture; use snafu::OptionExt; -use store_api::storage::RegionId; mod close_region; mod downgrade_region; @@ -30,10 +27,15 @@ mod flush_region; mod open_region; mod upgrade_region; +use crate::heartbeat::handler::close_region::CloseRegionsHandler; +use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler; +use crate::heartbeat::handler::flush_region::FlushRegionsHandler; +use crate::heartbeat::handler::open_region::OpenRegionsHandler; +use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; use crate::heartbeat::task_tracker::TaskTracker; use crate::region_server::RegionServer; -/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion]. +/// The handler for [`Instruction`]s. #[derive(Clone)] pub struct RegionHeartbeatResponseHandler { region_server: RegionServer, @@ -43,9 +45,14 @@ pub struct RegionHeartbeatResponseHandler { open_region_parallelism: usize, } -/// Handler of the instruction. -pub type InstructionHandler = - Box BoxFuture<'static, Option> + Send>; +#[async_trait::async_trait] +pub trait InstructionHandler: Send + Sync { + async fn handle( + &self, + ctx: &HandlerContext, + instruction: Instruction, + ) -> Option; +} #[derive(Clone)] pub struct HandlerContext { @@ -56,10 +63,6 @@ pub struct HandlerContext { } impl HandlerContext { - fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId { - RegionId::new(region_ident.table_id, region_ident.region_number) - } - #[cfg(test)] pub fn new_for_test(region_server: RegionServer) -> Self { Self { @@ -90,31 +93,16 @@ impl RegionHeartbeatResponseHandler { self } - /// Builds the [InstructionHandler]. - fn build_handler(&self, instruction: Instruction) -> MetaResult { + fn build_handler(&self, instruction: &Instruction) -> MetaResult> { match instruction { - Instruction::OpenRegions(open_regions) => { - let open_region_parallelism = self.open_region_parallelism; - Ok(Box::new(move |handler_context| { - handler_context - .handle_open_regions_instruction(open_regions, open_region_parallelism) - })) - } - Instruction::CloseRegions(close_regions) => Ok(Box::new(move |handler_context| { - handler_context.handle_close_regions_instruction(close_regions) - })), - Instruction::DowngradeRegion(downgrade_region) => { - Ok(Box::new(move |handler_context| { - handler_context.handle_downgrade_region_instruction(downgrade_region) - })) - } - Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| { - handler_context.handle_upgrade_region_instruction(upgrade_region) + Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler)), + Instruction::OpenRegions(_) => Ok(Box::new(OpenRegionsHandler { + open_region_parallelism: self.open_region_parallelism, })), + Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler)), + Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler)), + Instruction::UpgradeRegion(_) => Ok(Box::new(UpgradeRegionsHandler)), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), - Instruction::FlushRegions(flush_regions) => Ok(Box::new(move |handler_context| { - handler_context.handle_flush_regions_instruction(flush_regions) - })), } } } @@ -124,7 +112,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!(ctx.incoming_message.as_ref(), |Some(( _, - Instruction::DowngradeRegion { .. }, + Instruction::DowngradeRegions { .. }, ))| Some(( _, Instruction::UpgradeRegion { .. } @@ -151,15 +139,19 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { let catchup_tasks = self.catchup_tasks.clone(); let downgrade_tasks = self.downgrade_tasks.clone(); let flush_tasks = self.flush_tasks.clone(); - let handler = self.build_handler(instruction)?; + let handler = self.build_handler(&instruction)?; let _handle = common_runtime::spawn_global(async move { - let reply = handler(HandlerContext { - region_server, - catchup_tasks, - downgrade_tasks, - flush_tasks, - }) - .await; + let reply = handler + .handle( + &HandlerContext { + region_server, + catchup_tasks, + downgrade_tasks, + flush_tasks, + }, + instruction, + ) + .await; if let Some(reply) = reply && let Err(e) = mailbox.send((meta, reply)).await @@ -179,6 +171,7 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use common_meta::RegionIdent; use common_meta::heartbeat::mailbox::{ HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, }; @@ -249,10 +242,10 @@ mod tests { ); // Downgrade region - let instruction = Instruction::DowngradeRegion(DowngradeRegion { + let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), - }); + }]); assert!( heartbeat_handler .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))) @@ -447,10 +440,10 @@ mod tests { // Should be ok, if we try to downgrade it twice. for _ in 0..2 { let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); - let instruction = Instruction::DowngradeRegion(DowngradeRegion { + let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion { region_id, flush_timeout: Some(Duration::from_secs(1)), - }); + }]); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); @@ -458,33 +451,27 @@ mod tests { let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); - if let InstructionReply::DowngradeRegion(reply) = reply { - assert!(reply.exists); - assert!(reply.error.is_none()); - assert_eq!(reply.last_entry_id.unwrap(), 0); - } else { - unreachable!() - } + let reply = &reply.expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.is_none()); + assert_eq!(reply.last_entry_id.unwrap(), 0); } // Downgrades a not exists region. let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0"); - let instruction = Instruction::DowngradeRegion(DowngradeRegion { + let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion { region_id: RegionId::new(2048, 1), flush_timeout: Some(Duration::from_secs(1)), - }); + }]); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); - if let InstructionReply::DowngradeRegion(reply) = reply { - assert!(!reply.exists); - assert!(reply.error.is_none()); - assert!(reply.last_entry_id.is_none()); - } else { - unreachable!() - } + let reply = reply.expect_downgrade_regions_reply(); + assert!(!reply[0].exists); + assert!(reply[0].error.is_none()); + assert!(reply[0].last_entry_id.is_none()); } } diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index c942642731..88ed043fab 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -12,60 +12,64 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::RegionIdent; -use common_meta::instruction::{InstructionReply, SimpleReply}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_telemetry::warn; use futures::future::join_all; -use futures_util::future::BoxFuture; use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::storage::RegionId; use crate::error; -use crate::heartbeat::handler::HandlerContext; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; -impl HandlerContext { - pub(crate) fn handle_close_regions_instruction( - self, - region_idents: Vec, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let region_ids = region_idents - .into_iter() - .map(|region_ident| Self::region_ident_to_region_id(®ion_ident)) - .collect::>(); +#[derive(Debug, Clone, Copy, Default)] +pub struct CloseRegionsHandler; - let futs = region_ids.iter().map(|region_id| { - self.region_server - .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {})) - }); +#[async_trait::async_trait] +impl InstructionHandler for CloseRegionsHandler { + async fn handle( + &self, + ctx: &HandlerContext, + instruction: Instruction, + ) -> Option { + // Safety: must be `Instruction::CloseRegions` instruction. + let region_idents = instruction.into_close_regions().unwrap(); + let region_ids = region_idents + .into_iter() + .map(|region_ident| RegionId::new(region_ident.table_id, region_ident.region_number)) + .collect::>(); - let results = join_all(futs).await; + let futs = region_ids.iter().map(|region_id| { + ctx.region_server + .handle_request(*region_id, RegionRequest::Close(RegionCloseRequest {})) + }); - let mut errors = vec![]; - for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) { - match result { - Ok(_) => (), - Err(error::Error::RegionNotFound { .. }) => { - warn!( - "Received a close regions instruction from meta, but target region:{} is not found.", - region_id - ); - } - Err(err) => errors.push(format!("region:{region_id}: {err:?}")), + let results = join_all(futs).await; + + let mut errors = vec![]; + for (region_id, result) in region_ids.into_iter().zip(results.into_iter()) { + match result { + Ok(_) => (), + Err(error::Error::RegionNotFound { .. }) => { + warn!( + "Received a close regions instruction from meta, but target region:{} is not found.", + region_id + ); } + Err(err) => errors.push(format!("region:{region_id}: {err:?}")), } + } - if errors.is_empty() { - return Some(InstructionReply::CloseRegions(SimpleReply { - result: true, - error: None, - })); - } + if errors.is_empty() { + return Some(InstructionReply::CloseRegions(SimpleReply { + result: true, + error: None, + })); + } - Some(InstructionReply::CloseRegions(SimpleReply { - result: false, - error: Some(errors.join("; ")), - })) - }) + Some(InstructionReply::CloseRegions(SimpleReply { + result: false, + error: Some(errors.join("; ")), + })) } } diff --git a/src/datanode/src/heartbeat/handler/downgrade_region.rs b/src/datanode/src/heartbeat/handler/downgrade_region.rs index 06d3ab046e..91ceddb91a 100644 --- a/src/datanode/src/heartbeat/handler/downgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/downgrade_region.rs @@ -12,209 +12,242 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply}; +use common_meta::instruction::{ + DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply, +}; use common_telemetry::tracing::info; use common_telemetry::{error, warn}; -use futures_util::future::BoxFuture; +use futures::future::join_all; use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState}; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; -use crate::heartbeat::handler::HandlerContext; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; use crate::heartbeat::task_tracker::WaitResult; -impl HandlerContext { - async fn downgrade_to_follower_gracefully( +#[derive(Debug, Clone, Copy, Default)] +pub struct DowngradeRegionsHandler; + +impl DowngradeRegionsHandler { + async fn handle_downgrade_region( + ctx: &HandlerContext, + DowngradeRegion { + region_id, + flush_timeout, + }: DowngradeRegion, + ) -> DowngradeRegionReply { + let Some(writable) = ctx.region_server.is_region_leader(region_id) else { + warn!("Region: {region_id} is not found"); + return DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: false, + error: None, + }; + }; + + let region_server_moved = ctx.region_server.clone(); + + // Ignores flush request + if !writable { + warn!( + "Region: {region_id} is not writable, flush_timeout: {:?}", + flush_timeout + ); + return ctx.downgrade_to_follower_gracefully(region_id).await; + } + + // If flush_timeout is not set, directly convert region to follower. + let Some(flush_timeout) = flush_timeout else { + return ctx.downgrade_to_follower_gracefully(region_id).await; + }; + + // Sets region to downgrading, + // the downgrading region will reject all write requests. + // However, the downgrading region will still accept read, flush requests. + match ctx + .region_server + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader) + .await + { + Ok(SetRegionRoleStateResponse::Success { .. }) => {} + Ok(SetRegionRoleStateResponse::NotFound) => { + warn!("Region: {region_id} is not found"); + return DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: false, + error: None, + }; + } + Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => { + error!(err; "Failed to convert region to downgrading leader - invalid transition"); + return DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + }; + } + Err(err) => { + error!(err; "Failed to convert region to downgrading leader"); + return DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + }; + } + } + + let register_result = ctx + .downgrade_tasks + .try_register( + region_id, + Box::pin(async move { + info!("Flush region: {region_id} before converting region to follower"); + region_server_moved + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await?; + + Ok(()) + }), + ) + .await; + + if register_result.is_busy() { + warn!("Another flush task is running for the region: {region_id}"); + } + + let mut watcher = register_result.into_watcher(); + let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await; + + match result { + WaitResult::Timeout => DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!( + "Flush region timeout, region: {region_id}, timeout: {:?}", + flush_timeout + )), + }, + WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await, + WaitResult::Finish(Err(err)) => DowngradeRegionReply { + region_id, + last_entry_id: None, + metadata_last_entry_id: None, + exists: true, + error: Some(format!("{err:?}")), + }, + } + } +} + +#[async_trait::async_trait] +impl InstructionHandler for DowngradeRegionsHandler { + async fn handle( &self, - region_id: RegionId, + ctx: &HandlerContext, + instruction: Instruction, ) -> Option { + // Safety: must be `Instruction::DowngradeRegion` instruction. + let downgrade_regions = instruction.into_downgrade_regions().unwrap(); + let futures = downgrade_regions + .into_iter() + .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region)); + // Join all futures; parallelism is governed by the underlying flush scheduler. + let results = join_all(futures).await; + + Some(InstructionReply::DowngradeRegions( + DowngradeRegionsReply::new(results), + )) + } +} + +impl HandlerContext { + async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply { match self .region_server .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower) .await { - Ok(SetRegionRoleStateResponse::Success(success)) => { - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: success.last_entry_id(), - metadata_last_entry_id: success.metadata_last_entry_id(), - exists: true, - error: None, - })) - } + Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply { + region_id, + last_entry_id: success.last_entry_id(), + metadata_last_entry_id: success.metadata_last_entry_id(), + exists: true, + error: None, + }, Ok(SetRegionRoleStateResponse::NotFound) => { warn!("Region: {region_id} is not found"); - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + DowngradeRegionReply { + region_id, last_entry_id: None, metadata_last_entry_id: None, exists: false, error: None, - })) + } } Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => { error!(err; "Failed to convert region to follower - invalid transition"); - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + DowngradeRegionReply { + region_id, last_entry_id: None, metadata_last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - })) + } } Err(err) => { error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower); - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { + DowngradeRegionReply { + region_id, last_entry_id: None, metadata_last_entry_id: None, exists: true, error: Some(format!("{err:?}")), - })) + } } } } - - pub(crate) fn handle_downgrade_region_instruction( - self, - DowngradeRegion { - region_id, - flush_timeout, - }: DowngradeRegion, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let Some(writable) = self.region_server.is_region_leader(region_id) else { - warn!("Region: {region_id} is not found"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: false, - error: None, - })); - }; - - let region_server_moved = self.region_server.clone(); - - // Ignores flush request - if !writable { - warn!( - "Region: {region_id} is not writable, flush_timeout: {:?}", - flush_timeout - ); - return self.downgrade_to_follower_gracefully(region_id).await; - } - - // If flush_timeout is not set, directly convert region to follower. - let Some(flush_timeout) = flush_timeout else { - return self.downgrade_to_follower_gracefully(region_id).await; - }; - - // Sets region to downgrading, - // the downgrading region will reject all write requests. - // However, the downgrading region will still accept read, flush requests. - match self - .region_server - .set_region_role_state_gracefully( - region_id, - SettableRegionRoleState::DowngradingLeader, - ) - .await - { - Ok(SetRegionRoleStateResponse::Success { .. }) => {} - Ok(SetRegionRoleStateResponse::NotFound) => { - warn!("Region: {region_id} is not found"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: false, - error: None, - })); - } - Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => { - error!(err; "Failed to convert region to downgrading leader - invalid transition"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: true, - error: Some(format!("{err:?}")), - })); - } - Err(err) => { - error!(err; "Failed to convert region to downgrading leader"); - return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: true, - error: Some(format!("{err:?}")), - })); - } - } - - let register_result = self - .downgrade_tasks - .try_register( - region_id, - Box::pin(async move { - info!("Flush region: {region_id} before converting region to follower"); - region_server_moved - .handle_request( - region_id, - RegionRequest::Flush(RegionFlushRequest { - row_group_size: None, - }), - ) - .await?; - - Ok(()) - }), - ) - .await; - - if register_result.is_busy() { - warn!("Another flush task is running for the region: {region_id}"); - } - - let mut watcher = register_result.into_watcher(); - let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await; - - match result { - WaitResult::Timeout => { - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: true, - error: Some(format!( - "Flush region timeout, region: {region_id}, timeout: {:?}", - flush_timeout - )), - })) - } - WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await, - WaitResult::Finish(Err(err)) => { - Some(InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id: None, - metadata_last_entry_id: None, - exists: true, - error: Some(format!("{err:?}")), - })) - } - } - }) - } } #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::Arc; use std::time::Duration; - use common_meta::instruction::{DowngradeRegion, InstructionReply}; + use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; + use common_meta::heartbeat::mailbox::MessageMeta; + use common_meta::instruction::{DowngradeRegion, Instruction}; + use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; use store_api::region_engine::{ - RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, + RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess, }; use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use tokio::time::Instant; use crate::error; - use crate::heartbeat::handler::HandlerContext; + use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler; + use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv; + use crate::heartbeat::handler::{ + HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler, + }; use crate::tests::{MockRegionEngine, mock_region_server}; #[tokio::test] @@ -227,20 +260,20 @@ mod tests { let waits = vec![None, Some(Duration::from_millis(100u64))]; for flush_timeout in waits { - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(!reply.exists); - assert!(reply.error.is_none()); - assert!(reply.last_entry_id.is_none()); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(reply.last_entry_id.is_none()); } } @@ -270,20 +303,20 @@ mod tests { let waits = vec![None, Some(Duration::from_millis(100u64))]; for flush_timeout in waits { - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!(reply.error.is_none()); - assert_eq!(reply.last_entry_id.unwrap(), 1024); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.is_none()); + assert_eq!(reply.last_entry_id.unwrap(), 1024); } } @@ -305,20 +338,20 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server); let flush_timeout = Duration::from_millis(100); - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout: Some(flush_timeout), - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout: Some(flush_timeout), + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!(reply.error.unwrap().contains("timeout")); - assert!(reply.last_entry_id.is_none()); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.as_ref().unwrap().contains("timeout")); + assert!(reply.last_entry_id.is_none()); } #[tokio::test] @@ -344,36 +377,38 @@ mod tests { ]; for flush_timeout in waits { - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!(reply.error.unwrap().contains("timeout")); - assert!(reply.last_entry_id.is_none()); - } + + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.as_ref().unwrap().contains("timeout")); + assert!(reply.last_entry_id.is_none()); } let timer = Instant::now(); - let reply = handler_context - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout: Some(Duration::from_millis(500)), - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout: Some(Duration::from_millis(500)), + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); // Must less than 300 ms. assert!(timer.elapsed().as_millis() < 300); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!(reply.error.is_none()); - assert_eq!(reply.last_entry_id.unwrap(), 1024); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.is_none()); + assert_eq!(reply.last_entry_id.unwrap(), 1024); } #[tokio::test] @@ -405,36 +440,36 @@ mod tests { ]; for flush_timeout in waits { - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!(reply.error.unwrap().contains("timeout")); - assert!(reply.last_entry_id.is_none()); - } - } - let timer = Instant::now(); - let reply = handler_context - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout: Some(Duration::from_millis(500)), - }) - .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - // Must less than 300 ms. - assert!(timer.elapsed().as_millis() < 300); - - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; assert!(reply.exists); - assert!(reply.error.unwrap().contains("flush failed")); + assert!(reply.error.as_ref().unwrap().contains("timeout")); assert!(reply.last_entry_id.is_none()); } + let timer = Instant::now(); + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout: Some(Duration::from_millis(500)), + }]), + ) + .await; + // Must less than 300 ms. + assert!(timer.elapsed().as_millis() < 300); + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!(reply.error.as_ref().unwrap().contains("flush failed")); + assert!(reply.last_entry_id.is_none()); } #[tokio::test] @@ -449,19 +484,19 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout: None, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout: None, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(!reply.exists); - assert!(reply.error.is_none()); - assert!(reply.last_entry_id.is_none()); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(reply.last_entry_id.is_none()); } #[tokio::test] @@ -480,23 +515,77 @@ mod tests { }); mock_region_server.register_test_region(region_id, mock_engine); let handler_context = HandlerContext::new_for_test(mock_region_server); - let reply = handler_context - .clone() - .handle_downgrade_region_instruction(DowngradeRegion { - region_id, - flush_timeout: None, - }) + let reply = DowngradeRegionsHandler + .handle( + &handler_context, + Instruction::DowngradeRegions(vec![DowngradeRegion { + region_id, + flush_timeout: None, + }]), + ) .await; - assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_))); - if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() { - assert!(reply.exists); - assert!( - reply - .error - .unwrap() - .contains("Failed to set region to readonly") - ); - assert!(reply.last_entry_id.is_none()); - } + let reply = &reply.unwrap().expect_downgrade_regions_reply()[0]; + assert!(reply.exists); + assert!( + reply + .error + .as_ref() + .unwrap() + .contains("Failed to set region to readonly") + ); + assert!(reply.last_entry_id.is_none()); + } + + #[tokio::test] + async fn test_downgrade_regions() { + common_telemetry::init_default_ut_logging(); + + let mut region_server = mock_region_server(); + let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); + let mut engine_env = TestEnv::with_prefix("downgrade-regions").await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + let region_id = RegionId::new(1024, 1); + let region_id1 = RegionId::new(1024, 2); + let builder = CreateRequestBuilder::new(); + let create_req = builder.build(); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + let create_req1 = builder.build(); + region_server + .handle_request(region_id1, RegionRequest::Create(create_req1)) + .await + .unwrap(); + let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0"); + let instruction = Instruction::DowngradeRegions(vec![ + DowngradeRegion { + region_id, + flush_timeout: Some(Duration::from_secs(1)), + }, + DowngradeRegion { + region_id: region_id1, + flush_timeout: Some(Duration::from_secs(1)), + }, + ]); + let mut heartbeat_env = HeartbeatResponseTestEnv::new(); + let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); + let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); + assert_matches!(control, HandleControl::Continue); + + let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); + let reply = reply.expect_downgrade_regions_reply(); + assert_eq!(reply[0].region_id, region_id); + assert!(reply[0].exists); + assert!(reply[0].error.is_none()); + assert_eq!(reply[0].last_entry_id, Some(0)); + assert_eq!(reply[1].region_id, region_id1); + assert!(reply[1].exists); + assert!(reply[1].error.is_none()); + assert_eq!(reply[1].last_entry_id, Some(0)); + + assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower); + assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower); } } diff --git a/src/datanode/src/heartbeat/handler/flush_region.rs b/src/datanode/src/heartbeat/handler/flush_region.rs index 963d3bf488..56b841bf00 100644 --- a/src/datanode/src/heartbeat/handler/flush_region.rs +++ b/src/datanode/src/heartbeat/handler/flush_region.rs @@ -15,19 +15,53 @@ use std::time::Instant; use common_meta::instruction::{ - FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply, + FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply, }; use common_telemetry::{debug, warn}; -use futures_util::future::BoxFuture; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::RegionId; -use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu}; -use crate::heartbeat::handler::HandlerContext; +use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu}; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +pub struct FlushRegionsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for FlushRegionsHandler { + async fn handle( + &self, + ctx: &HandlerContext, + instruction: Instruction, + ) -> Option { + let start_time = Instant::now(); + let flush_regions = instruction.into_flush_regions().unwrap(); + let strategy = flush_regions.strategy; + let region_ids = flush_regions.region_ids; + let error_strategy = flush_regions.error_strategy; + + let reply = if matches!(strategy, FlushStrategy::Async) { + // Asynchronous hint mode: fire-and-forget, no reply expected + ctx.handle_flush_hint(region_ids).await; + None + } else { + // Synchronous mode: return reply with results + let reply = ctx.handle_flush_sync(region_ids, error_strategy).await; + Some(InstructionReply::FlushRegions(reply)) + }; + + let elapsed = start_time.elapsed(); + debug!( + "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}", + strategy, elapsed, reply + ); + + reply + } +} impl HandlerContext { /// Performs the actual region flush operation. - async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> { + async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> { let request = RegionRequest::Flush(RegionFlushRequest { row_group_size: None, }); @@ -92,7 +126,7 @@ impl HandlerContext { } /// Flushes a single region synchronously with proper error handling. - async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> { + async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> { // Check if region is leader and writable let Some(writable) = self.region_server.is_region_leader(region_id) else { return Err(RegionNotFoundSnafu { region_id }.build()); @@ -135,37 +169,6 @@ impl HandlerContext { .build()), } } - - /// Unified handler for FlushRegions with all flush semantics. - pub(crate) fn handle_flush_regions_instruction( - self, - flush_regions: FlushRegions, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let start_time = Instant::now(); - let strategy = flush_regions.strategy; - let region_ids = flush_regions.region_ids; - let error_strategy = flush_regions.error_strategy; - - let reply = if matches!(strategy, FlushStrategy::Async) { - // Asynchronous hint mode: fire-and-forget, no reply expected - self.handle_flush_hint(region_ids).await; - None - } else { - // Synchronous mode: return reply with results - let reply = self.handle_flush_sync(region_ids, error_strategy).await; - Some(InstructionReply::FlushRegions(reply)) - }; - - let elapsed = start_time.elapsed(); - debug!( - "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}", - strategy, elapsed, reply - ); - - reply - }) - } } #[cfg(test)] @@ -201,9 +204,11 @@ mod tests { // Async hint mode let flush_instruction = FlushRegions::async_batch(region_ids.clone()); - let reply = handler_context - .clone() - .handle_flush_regions_instruction(flush_instruction) + let reply = FlushRegionsHandler + .handle( + &handler_context, + Instruction::FlushRegions(flush_instruction), + ) .await; assert!(reply.is_none()); // Hint mode returns no reply assert_eq!(*flushed_region_ids.read().unwrap(), region_ids); @@ -212,8 +217,11 @@ mod tests { flushed_region_ids.write().unwrap().clear(); let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::>(); let flush_instruction = FlushRegions::async_batch(not_found_region_ids); - let reply = handler_context - .handle_flush_regions_instruction(flush_instruction) + let reply = FlushRegionsHandler + .handle( + &handler_context, + Instruction::FlushRegions(flush_instruction), + ) .await; assert!(reply.is_none()); assert!(flushed_region_ids.read().unwrap().is_empty()); @@ -238,20 +246,17 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server); let flush_instruction = FlushRegions::sync_single(region_id); - let reply = handler_context - .handle_flush_regions_instruction(flush_instruction) + let reply = FlushRegionsHandler + .handle( + &handler_context, + Instruction::FlushRegions(flush_instruction), + ) .await; - - assert!(reply.is_some()); - if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { - assert!(flush_reply.overall_success); - assert_eq!(flush_reply.results.len(), 1); - assert_eq!(flush_reply.results[0].0, region_id); - assert!(flush_reply.results[0].1.is_ok()); - } else { - panic!("Expected FlushRegions reply"); - } - + let flush_reply = reply.unwrap().expect_flush_regions_reply(); + assert!(flush_reply.overall_success); + assert_eq!(flush_reply.results.len(), 1); + assert_eq!(flush_reply.results[0].0, region_id); + assert!(flush_reply.results[0].1.is_ok()); assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]); } @@ -281,18 +286,16 @@ mod tests { // Sync batch with fail-fast strategy let flush_instruction = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast); - let reply = handler_context - .handle_flush_regions_instruction(flush_instruction) + let reply = FlushRegionsHandler + .handle( + &handler_context, + Instruction::FlushRegions(flush_instruction), + ) .await; - - assert!(reply.is_some()); - if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { - assert!(!flush_reply.overall_success); // Should fail due to non-existent regions - // With fail-fast, only process regions until first failure - assert!(flush_reply.results.len() <= region_ids.len()); - } else { - panic!("Expected FlushRegions reply"); - } + let flush_reply = reply.unwrap().expect_flush_regions_reply(); + assert!(!flush_reply.overall_success); // Should fail due to non-existent regions + // With fail-fast, only process regions until first failure + assert!(flush_reply.results.len() <= region_ids.len()); } #[tokio::test] @@ -317,20 +320,18 @@ mod tests { // Sync batch with try-all strategy let flush_instruction = FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll); - let reply = handler_context - .handle_flush_regions_instruction(flush_instruction) + let reply = FlushRegionsHandler + .handle( + &handler_context, + Instruction::FlushRegions(flush_instruction), + ) .await; - - assert!(reply.is_some()); - if let Some(InstructionReply::FlushRegions(flush_reply)) = reply { - assert!(!flush_reply.overall_success); // Should fail due to one non-existent region - // With try-all, should process all regions - assert_eq!(flush_reply.results.len(), region_ids.len()); - // First should succeed, second should fail - assert!(flush_reply.results[0].1.is_ok()); - assert!(flush_reply.results[1].1.is_err()); - } else { - panic!("Expected FlushRegions reply"); - } + let flush_reply = reply.unwrap().expect_flush_regions_reply(); + assert!(!flush_reply.overall_success); // Should fail due to one non-existent region + // With try-all, should process all regions + assert_eq!(flush_reply.results.len(), region_ids.len()); + // First should succeed, second should fail + assert!(flush_reply.results[0].1.is_ok()); + assert!(flush_reply.results[1].1.is_err()); } } diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index e6ea973eec..77cd4fe6a0 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -12,56 +12,62 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply}; +use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::wal_options_allocator::prepare_wal_options; -use futures_util::future::BoxFuture; use store_api::path_utils::table_dir; use store_api::region_request::{PathType, RegionOpenRequest}; +use store_api::storage::RegionId; -use crate::heartbeat::handler::HandlerContext; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; -impl HandlerContext { - pub(crate) fn handle_open_regions_instruction( - self, - open_regions: Vec, - open_region_parallelism: usize, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let requests = open_regions - .into_iter() - .map(|open_region| { - let OpenRegion { - region_ident, - region_storage_path, - mut region_options, - region_wal_options, - skip_wal_replay, - } = open_region; - let region_id = Self::region_ident_to_region_id(®ion_ident); - prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); - let request = RegionOpenRequest { - engine: region_ident.engine, - table_dir: table_dir(®ion_storage_path, region_id.table_id()), - path_type: PathType::Bare, - options: region_options, - skip_wal_replay, - checkpoint: None, - }; - (region_id, request) - }) - .collect::>(); +pub struct OpenRegionsHandler { + pub open_region_parallelism: usize, +} - let result = self - .region_server - .handle_batch_open_requests(open_region_parallelism, requests, false) - .await; - let success = result.is_ok(); - let error = result.as_ref().map_err(|e| format!("{e:?}")).err(); - Some(InstructionReply::OpenRegions(SimpleReply { - result: success, - error, - })) - }) +#[async_trait::async_trait] +impl InstructionHandler for OpenRegionsHandler { + async fn handle( + &self, + ctx: &HandlerContext, + instruction: Instruction, + ) -> Option { + let open_regions = instruction.into_open_regions().unwrap(); + + let requests = open_regions + .into_iter() + .map(|open_region| { + let OpenRegion { + region_ident, + region_storage_path, + mut region_options, + region_wal_options, + skip_wal_replay, + } = open_region; + let region_id = RegionId::new(region_ident.table_id, region_ident.region_number); + prepare_wal_options(&mut region_options, region_id, ®ion_wal_options); + let request = RegionOpenRequest { + engine: region_ident.engine, + table_dir: table_dir(®ion_storage_path, region_id.table_id()), + path_type: PathType::Bare, + options: region_options, + skip_wal_replay, + checkpoint: None, + }; + (region_id, request) + }) + .collect::>(); + + let result = ctx + .region_server + .handle_batch_open_requests(self.open_region_parallelism, requests, false) + .await; + let success = result.is_ok(); + let error = result.as_ref().map_err(|e| format!("{e:?}")).err(); + + Some(InstructionReply::OpenRegions(SimpleReply { + result: success, + error, + })) } } diff --git a/src/datanode/src/heartbeat/handler/upgrade_region.rs b/src/datanode/src/heartbeat/handler/upgrade_region.rs index c1f238e059..239eaf1e4c 100644 --- a/src/datanode/src/heartbeat/handler/upgrade_region.rs +++ b/src/datanode/src/heartbeat/handler/upgrade_region.rs @@ -12,18 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply}; +use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; use common_telemetry::{info, warn}; -use futures_util::future::BoxFuture; use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint}; -use crate::heartbeat::handler::HandlerContext; +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; use crate::heartbeat::task_tracker::WaitResult; -impl HandlerContext { - pub(crate) fn handle_upgrade_region_instruction( - self, - UpgradeRegion { +#[derive(Debug, Clone, Copy, Default)] +pub struct UpgradeRegionsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for UpgradeRegionsHandler { + async fn handle( + &self, + ctx: &HandlerContext, + instruction: Instruction, + ) -> Option { + let UpgradeRegion { region_id, last_entry_id, metadata_last_entry_id, @@ -31,116 +37,116 @@ impl HandlerContext { location_id, replay_entry_id, metadata_replay_entry_id, - }: UpgradeRegion, - ) -> BoxFuture<'static, Option> { - Box::pin(async move { - let Some(writable) = self.region_server.is_region_leader(region_id) else { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: false, - error: None, - })); - }; + } = instruction.into_upgrade_regions().unwrap(); - if writable { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + let Some(writable) = ctx.region_server.is_region_leader(region_id) else { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready: false, + exists: false, + error: None, + })); + }; + + if writable { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready: true, + exists: true, + error: None, + })); + } + + let region_server_moved = ctx.region_server.clone(); + + let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { + (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { + entry_id, + metadata_entry_id, + }), + _ => None, + }; + + // The catchup task is almost zero cost if the inside region is writable. + // Therefore, it always registers a new catchup task. + let register_result = ctx + .catchup_tasks + .try_register( + region_id, + Box::pin(async move { + info!( + "Executing region: {region_id} catchup to: last entry id {last_entry_id:?}" + ); + region_server_moved + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: true, + entry_id: last_entry_id, + metadata_entry_id: metadata_last_entry_id, + location_id, + checkpoint, + }), + ) + .await?; + + Ok(()) + }), + ) + .await; + + if register_result.is_busy() { + warn!("Another catchup task is running for the region: {region_id}"); + } + + // Returns immediately + let Some(replay_timeout) = replay_timeout else { + return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready: false, + exists: true, + error: None, + })); + }; + + // We don't care that it returns a newly registered or running task. + let mut watcher = register_result.into_watcher(); + let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await; + + match result { + WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + ready: false, + exists: true, + error: None, + })), + WaitResult::Finish(Ok(_)) => { + Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: true, exists: true, error: None, - })); + })) } - - let region_server_moved = self.region_server.clone(); - - let checkpoint = match (replay_entry_id, metadata_replay_entry_id) { - (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint { - entry_id, - metadata_entry_id, - }), - _ => None, - }; - - // The catchup task is almost zero cost if the inside region is writable. - // Therefore, it always registers a new catchup task. - let register_result = self - .catchup_tasks - .try_register( - region_id, - Box::pin(async move { - info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"); - region_server_moved - .handle_request( - region_id, - RegionRequest::Catchup(RegionCatchupRequest { - set_writable: true, - entry_id: last_entry_id, - metadata_entry_id: metadata_last_entry_id, - location_id, - checkpoint, - }), - ) - .await?; - - Ok(()) - }), - ) - .await; - - if register_result.is_busy() { - warn!("Another catchup task is running for the region: {region_id}"); - } - - // Returns immediately - let Some(replay_timeout) = replay_timeout else { - return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { + WaitResult::Finish(Err(err)) => { + Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { ready: false, exists: true, - error: None, - })); - }; - - // We don't care that it returns a newly registered or running task. - let mut watcher = register_result.into_watcher(); - let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await; - - match result { - WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: true, - error: None, - })), - WaitResult::Finish(Ok(_)) => { - Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: true, - exists: true, - error: None, - })) - } - WaitResult::Finish(Err(err)) => { - Some(InstructionReply::UpgradeRegion(UpgradeRegionReply { - ready: false, - exists: true, - error: Some(format!("{err:?}")), - })) - } + error: Some(format!("{err:?}")), + })) } - }) + } } } #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use std::time::Duration; - use common_meta::instruction::{InstructionReply, UpgradeRegion}; + use common_meta::instruction::{Instruction, UpgradeRegion}; use mito2::engine::MITO_ENGINE_NAME; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; use tokio::time::Instant; use crate::error; - use crate::heartbeat::handler::HandlerContext; + use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; + use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; use crate::tests::{MockRegionEngine, mock_region_server}; #[tokio::test] @@ -155,20 +161,20 @@ mod tests { let waits = vec![None, Some(Duration::from_millis(100u64))]; for replay_timeout in waits { - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout, - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout, + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(!reply.exists); - assert!(reply.error.is_none()); - } + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(!reply.exists); + assert!(reply.error.is_none()); } } @@ -192,21 +198,21 @@ mod tests { let waits = vec![None, Some(Duration::from_millis(100u64))]; for replay_timeout in waits { - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout, - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout, + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(reply.ready); - assert!(reply.exists); - assert!(reply.error.is_none()); - } + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(reply.ready); + assert!(reply.exists); + assert!(reply.error.is_none()); } } @@ -230,21 +236,21 @@ mod tests { let waits = vec![None, Some(Duration::from_millis(100u64))]; for replay_timeout in waits { - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout, - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout, + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(!reply.ready); - assert!(reply.exists); - assert!(reply.error.is_none()); - } + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(!reply.ready); + assert!(reply.exists); + assert!(reply.error.is_none()); } } @@ -271,40 +277,41 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server); for replay_timeout in waits { - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout, - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout, + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(!reply.ready); - assert!(reply.exists); - assert!(reply.error.is_none()); - } - } - - let timer = Instant::now(); - let reply = handler_context - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout: Some(Duration::from_millis(500)), - ..Default::default() - }) - .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - // Must less than 300 ms. - assert!(timer.elapsed().as_millis() < 300); - - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(reply.ready); + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(!reply.ready); assert!(reply.exists); assert!(reply.error.is_none()); } + + let timer = Instant::now(); + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout: Some(Duration::from_millis(500)), + ..Default::default() + }), + ) + .await; + // Must less than 300 ms. + assert!(timer.elapsed().as_millis() < 300); + + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(reply.ready); + assert!(reply.exists); + assert!(reply.error.is_none()); } #[tokio::test] @@ -329,37 +336,37 @@ mod tests { let handler_context = HandlerContext::new_for_test(mock_region_server); - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); // It didn't wait for handle returns; it had no idea about the error. - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(!reply.ready); - assert!(reply.exists); - assert!(reply.error.is_none()); - } + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(!reply.ready); + assert!(reply.exists); + assert!(reply.error.is_none()); - let reply = handler_context - .clone() - .handle_upgrade_region_instruction(UpgradeRegion { - region_id, - replay_timeout: Some(Duration::from_millis(200)), - ..Default::default() - }) + let reply = UpgradeRegionsHandler + .handle( + &handler_context, + Instruction::UpgradeRegion(UpgradeRegion { + region_id, + replay_timeout: Some(Duration::from_millis(200)), + ..Default::default() + }), + ) .await; - assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_))); - if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() { - assert!(!reply.ready); - assert!(reply.exists); - assert!(reply.error.is_some()); - assert!(reply.error.unwrap().contains("mock_error")); - } + let reply = reply.unwrap().expect_upgrade_region_reply(); + assert!(!reply.ready); + assert!(reply.exists); + assert!(reply.error.is_some()); + assert!(reply.error.unwrap().contains("mock_error")); } } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index ad805ae680..fb4065748c 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -19,7 +19,7 @@ use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{ - DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, + DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply, }; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{error, info, warn}; @@ -120,10 +120,10 @@ impl DowngradeLeaderRegion { ) -> Instruction { let pc = &ctx.persistent_ctx; let region_id = pc.region_id; - Instruction::DowngradeRegion(DowngradeRegion { + Instruction::DowngradeRegions(vec![DowngradeRegion { region_id, flush_timeout: Some(flush_timeout), - }) + }]) } /// Tries to downgrade a leader region. @@ -173,12 +173,7 @@ impl DowngradeLeaderRegion { region_id, now.elapsed() ); - let InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, - metadata_last_entry_id, - exists, - error, - }) = reply + let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply else { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), @@ -187,6 +182,15 @@ impl DowngradeLeaderRegion { .fail(); }; + // TODO(weny): handle multiple replies. + let DowngradeRegionReply { + region_id, + last_entry_id, + metadata_last_entry_id, + exists, + error, + } = &replies[0]; + if error.is_some() { return error::RetryLaterSnafu { reason: format!( @@ -216,12 +220,12 @@ impl DowngradeLeaderRegion { } if let Some(last_entry_id) = last_entry_id { - ctx.volatile_ctx.set_last_entry_id(last_entry_id); + ctx.volatile_ctx.set_last_entry_id(*last_entry_id); } if let Some(metadata_last_entry_id) = metadata_last_entry_id { ctx.volatile_ctx - .set_metadata_last_entry_id(metadata_last_entry_id); + .set_metadata_last_entry_id(*metadata_last_entry_id); } Ok(()) diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 8197087351..247f112514 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -17,7 +17,8 @@ use std::collections::HashMap; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ - DowngradeRegionReply, FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, + DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply, + UpgradeRegionReply, }; use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::TableRouteValue; @@ -183,12 +184,15 @@ pub fn new_downgrade_region_reply( to: "meta".to_string(), timestamp_millis: current_time_millis(), payload: Some(Payload::Json( - serde_json::to_string(&InstructionReply::DowngradeRegion(DowngradeRegionReply { - last_entry_id, - metadata_last_entry_id: None, - exists: exist, - error, - })) + serde_json::to_string(&InstructionReply::DowngradeRegions( + DowngradeRegionsReply::new(vec![DowngradeRegionReply { + region_id: RegionId::new(0, 0), + last_entry_id, + metadata_last_entry_id: None, + exists: exist, + error, + }]), + )) .unwrap(), )), }