From ee8698791259b845df9253a5b5b2175ac00c93a9 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 24 Dec 2025 10:50:27 +0800 Subject: [PATCH] feat(repartition): implement enter staging region state (#7447) * feat(repartition): implement enter staging region state Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/instruction.rs | 62 ++ src/datanode/src/heartbeat/handler.rs | 24 +- .../src/heartbeat/handler/enter_staging.rs | 243 ++++++ .../src/procedure/repartition/group.rs | 15 + .../repartition/group/enter_staging_region.rs | 717 ++++++++++++++++++ .../repartition/group/repartition_start.rs | 11 + .../src/procedure/repartition/group/utils.rs | 88 +++ .../src/procedure/repartition/test_util.rs | 6 +- src/meta-srv/src/procedure/test_util.rs | 34 +- 9 files changed, 1194 insertions(+), 6 deletions(-) create mode 100644 src/datanode/src/heartbeat/handler/enter_staging.rs create mode 100644 src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs create mode 100644 src/meta-srv/src/procedure/repartition/group/utils.rs diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 230c076673..52dd43579b 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -514,6 +514,22 @@ impl Display for GcRegionsReply { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct EnterStagingRegion { + pub region_id: RegionId, + pub partition_expr: String, +} + +impl Display for EnterStagingRegion { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "EnterStagingRegion(region_id={}, partition_expr={})", + self.region_id, self.partition_expr + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens regions. @@ -541,6 +557,8 @@ pub enum Instruction { GcRegions(GcRegions), /// Temporary suspend serving reads or writes Suspend, + /// Makes regions enter staging state. + EnterStagingRegions(Vec), } impl Instruction { @@ -597,6 +615,13 @@ impl Instruction { _ => None, } } + + pub fn into_enter_staging_regions(self) -> Option> { + match self { + Self::EnterStagingRegions(enter_staging) => Some(enter_staging), + _ => None, + } + } } /// The reply of [UpgradeRegion]. @@ -690,6 +715,28 @@ where }) } +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct EnterStagingRegionReply { + pub region_id: RegionId, + /// Returns true if the region is under the new region rule. + pub ready: bool, + /// Indicates whether the region exists. + pub exists: bool, + /// Return error if any during the operation. + pub error: Option, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct EnterStagingRegionsReply { + pub replies: Vec, +} + +impl EnterStagingRegionsReply { + pub fn new(replies: Vec) -> Self { + Self { replies } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { @@ -710,6 +757,7 @@ pub enum InstructionReply { FlushRegions(FlushRegionReply), GetFileRefs(GetFileRefsReply), GcRegions(GcRegionsReply), + EnterStagingRegions(EnterStagingRegionsReply), } impl Display for InstructionReply { @@ -726,6 +774,13 @@ impl Display for InstructionReply { Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply), Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply), Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply), + Self::EnterStagingRegions(reply) => { + write!( + f, + "InstructionReply::EnterStagingRegions({:?})", + reply.replies + ) + } } } } @@ -766,6 +821,13 @@ impl InstructionReply { _ => panic!("Expected FlushRegions reply"), } } + + pub fn expect_enter_staging_regions_reply(self) -> Vec { + match self { + Self::EnterStagingRegions(reply) => reply.replies, + _ => panic!("Expected EnterStagingRegion reply"), + } + } } #[cfg(test)] diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 9accd138fd..a39b3787ea 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -24,6 +24,7 @@ use store_api::storage::GcReport; mod close_region; mod downgrade_region; +mod enter_staging; mod file_ref; mod flush_region; mod gc_worker; @@ -32,6 +33,7 @@ mod upgrade_region; use crate::heartbeat::handler::close_region::CloseRegionsHandler; use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler; +use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler; use crate::heartbeat::handler::file_ref::GetFileRefsHandler; use crate::heartbeat::handler::flush_region::FlushRegionsHandler; use crate::heartbeat::handler::gc_worker::GcRegionsHandler; @@ -123,6 +125,9 @@ impl RegionHeartbeatResponseHandler { Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))), Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), Instruction::Suspend => Ok(None), + Instruction::EnterStagingRegions(_) => { + Ok(Some(Box::new(EnterStagingRegionsHandler.into()))) + } } } } @@ -136,6 +141,7 @@ pub enum InstructionHandlers { UpgradeRegions(UpgradeRegionsHandler), GetFileRefs(GetFileRefsHandler), GcRegions(GcRegionsHandler), + EnterStagingRegions(EnterStagingRegionsHandler), } macro_rules! impl_from_handler { @@ -157,7 +163,8 @@ impl_from_handler!( DowngradeRegionsHandler => DowngradeRegions, UpgradeRegionsHandler => UpgradeRegions, GetFileRefsHandler => GetFileRefs, - GcRegionsHandler => GcRegions + GcRegionsHandler => GcRegions, + EnterStagingRegionsHandler => EnterStagingRegions ); macro_rules! dispatch_instr { @@ -202,6 +209,7 @@ dispatch_instr!( UpgradeRegions => UpgradeRegions, GetFileRefs => GetFileRefs, GcRegions => GcRegions, + EnterStagingRegions => EnterStagingRegions ); #[async_trait] @@ -254,7 +262,9 @@ mod tests { use common_meta::heartbeat::mailbox::{ HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta, }; - use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion}; + use common_meta::instruction::{ + DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion, + }; use mito2::config::MitoConfig; use mito2::engine::MITO_ENGINE_NAME; use mito2::test_util::{CreateRequestBuilder, TestEnv}; @@ -335,6 +345,16 @@ mod tests { region_id, ..Default::default() }]); + assert!( + heartbeat_handler + .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))) + ); + + // Enter staging region + let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion { + region_id, + partition_expr: "".to_string(), + }]); assert!( heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction))) ); diff --git a/src/datanode/src/heartbeat/handler/enter_staging.rs b/src/datanode/src/heartbeat/handler/enter_staging.rs new file mode 100644 index 0000000000..9ca2eab1b8 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/enter_staging.rs @@ -0,0 +1,243 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{ + EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply, +}; +use common_telemetry::{error, warn}; +use futures::future::join_all; +use store_api::region_request::{EnterStagingRequest, RegionRequest}; + +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +#[derive(Debug, Clone, Copy, Default)] +pub struct EnterStagingRegionsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for EnterStagingRegionsHandler { + type Instruction = Vec; + + async fn handle( + &self, + ctx: &HandlerContext, + enter_staging: Self::Instruction, + ) -> Option { + let futures = enter_staging.into_iter().map(|enter_staging_region| { + Self::handle_enter_staging_region(ctx, enter_staging_region) + }); + let results = join_all(futures).await; + Some(InstructionReply::EnterStagingRegions( + EnterStagingRegionsReply::new(results), + )) + } +} + +impl EnterStagingRegionsHandler { + async fn handle_enter_staging_region( + ctx: &HandlerContext, + EnterStagingRegion { + region_id, + partition_expr, + }: EnterStagingRegion, + ) -> EnterStagingRegionReply { + let Some(writable) = ctx.region_server.is_region_leader(region_id) else { + warn!("Region: {} is not found", region_id); + return EnterStagingRegionReply { + region_id, + ready: false, + exists: false, + error: None, + }; + }; + if !writable { + warn!("Region: {} is not writable", region_id); + return EnterStagingRegionReply { + region_id, + ready: false, + exists: true, + error: Some("Region is not writable".into()), + }; + } + + match ctx + .region_server + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }), + ) + .await + { + Ok(_) => EnterStagingRegionReply { + region_id, + ready: true, + exists: true, + error: None, + }, + Err(err) => { + error!(err; "Failed to enter staging region"); + EnterStagingRegionReply { + region_id, + ready: false, + exists: true, + error: Some(format!("{err:?}")), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_meta::instruction::EnterStagingRegion; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use store_api::path_utils::table_dir; + use store_api::region_engine::RegionRole; + use store_api::region_request::RegionRequest; + use store_api::storage::RegionId; + + use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler; + use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + use crate::region_server::RegionServer; + use crate::tests::{MockRegionEngine, mock_region_server}; + + const PARTITION_EXPR: &str = "partition_expr"; + + #[tokio::test] + async fn test_region_not_exist() { + let mut mock_region_server = mock_region_server(); + let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); + mock_region_server.register_engine(mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let region_id = RegionId::new(1024, 1); + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_expr: "".to_string(), + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(!reply.ready); + } + + #[tokio::test] + async fn test_region_not_writable() { + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { + region_engine.mock_role = Some(Some(RegionRole::Follower)); + region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); + }); + mock_region_server.register_test_region(region_id, mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_expr: "".to_string(), + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.error.is_some()); + assert!(!reply.ready); + } + + async fn prepare_region(region_server: &RegionServer) { + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + create_req.table_dir = table_dir("test", 1024); + let region_id = RegionId::new(1024, 1); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_enter_staging() { + let mut region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let mut engine_env = TestEnv::new().await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + prepare_region(®ion_server).await; + + let handler_context = HandlerContext::new_for_test(region_server); + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_expr: PARTITION_EXPR.to_string(), + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.error.is_none()); + assert!(reply.ready); + + // Should be ok to enter staging mode again with the same partition expr + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_expr: PARTITION_EXPR.to_string(), + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.error.is_none()); + assert!(reply.ready); + + // Should throw error if try to enter staging mode again with a different partition expr + let replies = EnterStagingRegionsHandler + .handle( + &handler_context, + vec![EnterStagingRegion { + region_id, + partition_expr: "".to_string(), + }], + ) + .await + .unwrap(); + let replies = replies.expect_enter_staging_regions_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.error.is_some()); + assert!(!reply.ready); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index 7c3ee14e64..5cde3d2f42 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -12,11 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod enter_staging_region; pub(crate) mod repartition_start; pub(crate) mod update_metadata; +pub(crate) mod utils; use std::any::Any; use std::fmt::Debug; +use std::time::Duration; use common_error::ext::BoxedError; use common_meta::DatanodeId; @@ -34,6 +37,7 @@ use uuid::Uuid; use crate::error::{self, Result}; use crate::procedure::repartition::plan::RegionDescriptor; +use crate::service::mailbox::MailboxRef; pub type GroupId = Uuid; @@ -45,6 +49,10 @@ pub struct Context { pub cache_invalidator: CacheInvalidatorRef, pub table_metadata_manager: TableMetadataManagerRef, + + pub mailbox: MailboxRef, + + pub server_addr: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -184,6 +192,13 @@ impl Context { .await .context(error::TableMetadataManagerSnafu) } + + /// Returns the next operation timeout. + /// + /// If the next operation timeout is not set, it will return `None`. + pub fn next_operation_timeout(&self) -> Option { + Some(Duration::from_secs(10)) + } } /// Returns the region routes of the given table route value. diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs new file mode 100644 index 0000000000..dbbcc4283c --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -0,0 +1,717 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{ + EnterStagingRegionReply, EnterStagingRegionsReply, Instruction, InstructionReply, +}; +use common_meta::peer::Peer; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; + +use crate::error::{self, Error, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::repartition::group::utils::{ + HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, +}; +use crate::procedure::repartition::group::{Context, GroupPrepareResult, State}; +use crate::procedure::repartition::plan::RegionDescriptor; +use crate::service::mailbox::{Channel, MailboxRef}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct EnterStagingRegion; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for EnterStagingRegion { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + self.enter_staging_regions(ctx).await?; + + Ok(Self::next_state()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl EnterStagingRegion { + #[allow(dead_code)] + fn next_state() -> (Box, Status) { + // TODO(weny): change it later. + (Box::new(EnterStagingRegion), Status::executing(true)) + } + + fn build_enter_staging_instructions( + prepare_result: &GroupPrepareResult, + targets: &[RegionDescriptor], + ) -> Result> { + let target_partition_expr_by_region = targets + .iter() + .map(|target| { + Ok(( + target.region_id, + target + .partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?, + )) + }) + .collect::>>()?; + // Safety: `leader_peer` is set for all region routes, checked in `repartition_start`. + let target_region_routes_by_peer = + group_region_routes_by_peer(&prepare_result.target_routes); + let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len()); + for (peer, region_ids) in target_region_routes_by_peer { + let enter_staging_regions = region_ids + .into_iter() + .map(|region_id| common_meta::instruction::EnterStagingRegion { + region_id, + // Safety: the target_routes is constructed from the targets, so the region_id is always present in the map. + partition_expr: target_partition_expr_by_region[®ion_id].clone(), + }) + .collect(); + instructions.insert( + peer.clone(), + Instruction::EnterStagingRegions(enter_staging_regions), + ); + } + + Ok(instructions) + } + + #[allow(dead_code)] + async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> { + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + // Safety: the group prepare result is set in the RepartitionStart state. + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let targets = &ctx.persistent_ctx.targets; + let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?; + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Enter staging regions", + })?; + let (peers, tasks): (Vec<_>, Vec<_>) = instructions + .iter() + .map(|(peer, instruction)| { + ( + peer, + Self::enter_staging_region( + &ctx.mailbox, + &ctx.server_addr, + peer, + instruction, + operation_timeout, + ), + ) + }) + .unzip(); + info!( + "Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}", + peers, table_id, group_id + ); + + let format_err_msg = |idx: usize, error: &Error| { + let peer = peers[idx]; + format!( + "Failed to enter staging regions on datanode {:?}, error: {:?}", + peer, error + ) + }; + // Waits for all tasks to complete. + let results = join_all(tasks).await; + let result = handle_multiple_results(&results); + match result { + HandleMultipleResult::AllSuccessful => Ok(()), + HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu { + reason: format!( + "All retryable errors during entering staging regions for repartition table {}, group id {}: {:?}", + table_id, group_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu { + violated: format!( + "All non retryable errors during entering staging regions for repartition table {}, group id {}: {:?}", + table_id, group_id, + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::PartialRetryable { + retryable_errors, + non_retryable_errors, + } => error::UnexpectedSnafu { + violated: format!( + "Partial retryable errors during entering staging regions for repartition table {}, group id {}: {:?}, non retryable errors: {:?}", + table_id, group_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + ), + } + .fail(), + } + } + + /// Enter staging region on a datanode. + /// + /// Retry: + /// - Pusher is not found. + /// - Mailbox timeout. + /// + /// Abort(non-retry): + /// - Unexpected instruction reply. + /// - Exceeded deadline of enter staging regions instruction. + /// - Target region doesn't exist on the datanode. + async fn enter_staging_region( + mailbox: &MailboxRef, + server_addr: &str, + peer: &Peer, + instruction: &Instruction, + timeout: Duration, + ) -> Result<()> { + let ch = Channel::Datanode(peer.id); + let message = MailboxMessage::json_message( + &format!("Enter staging regions: {:?}", instruction), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + let now = Instant::now(); + let receiver = mailbox.send(&ch, message, timeout).await; + + let receiver = match receiver { + Ok(receiver) => receiver, + Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu { + reason: format!( + "Pusher not found for enter staging regions on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, + Err(err) => { + return Err(err); + } + }; + + match receiver.await { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received enter staging regions reply: {:?}, elapsed: {:?}", + reply, + now.elapsed() + ); + let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) = + reply + else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect enter staging regions reply", + } + .fail(); + }; + for reply in replies { + Self::handle_enter_staging_region_reply(&reply, &now, peer)?; + } + + Ok(()) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for enter staging regions on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + fn handle_enter_staging_region_reply( + EnterStagingRegionReply { + region_id, + ready, + exists, + error, + }: &EnterStagingRegionReply, + now: &Instant, + peer: &Peer, + ) -> Result<()> { + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Region {} doesn't exist on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ) + } + ); + + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to enter staging region {} on datanode {:?}, error: {:?}, elapsed: {:?}", + region_id, peer, error, now.elapsed() + ), + } + .fail(); + } + + ensure!( + ready, + error::RetryLaterSnafu { + reason: format!( + "Region {} is still entering staging state on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ), + } + ); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::time::Duration; + + use common_meta::instruction::Instruction; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use crate::error::{self, Error}; + use crate::procedure::repartition::group::GroupPrepareResult; + use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; + use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::test_util::{ + TestingEnv, new_persistent_context, range_expr, + }; + use crate::procedure::test_util::{ + new_close_region_reply, new_enter_staging_region_reply, send_mock_reply, + }; + use crate::service::mailbox::Channel; + + #[test] + fn test_build_enter_staging_instructions() { + let table_id = 1024; + let prepare_result = GroupPrepareResult { + source_routes: vec![RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }], + target_routes: vec![ + RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + ..Default::default() + }, + leader_peer: Some(Peer::empty(2)), + ..Default::default() + }, + ], + central_region: RegionId::new(table_id, 1), + central_region_datanode_id: 1, + }; + let targets = test_targets(); + let instructions = + EnterStagingRegion::build_enter_staging_instructions(&prepare_result, &targets) + .unwrap(); + + assert_eq!(instructions.len(), 2); + let instruction_1 = instructions + .get(&Peer::empty(1)) + .unwrap() + .clone() + .into_enter_staging_regions() + .unwrap(); + assert_eq!( + instruction_1, + vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(table_id, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }] + ); + let instruction_2 = instructions + .get(&Peer::empty(2)) + .unwrap() + .clone() + .into_enter_staging_regions() + .unwrap(); + assert_eq!( + instruction_2, + vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(table_id, 2), + partition_expr: range_expr("x", 10, 20).as_json_str().unwrap(), + }] + ); + } + + #[tokio::test] + async fn test_datanode_is_unreachable() { + let env = TestingEnv::new(); + let server_addr = "localhost"; + let peer = Peer::empty(1); + let instruction = + Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]); + let timeout = Duration::from_secs(10); + + let err = EnterStagingRegion::enter_staging_region( + env.mailbox_ctx.mailbox(), + server_addr, + &peer, + &instruction, + timeout, + ) + .await + .unwrap_err(); + + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_enter_staging_region_exceeded_deadline() { + let mut env = TestingEnv::new(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + let server_addr = "localhost"; + let peer = Peer::empty(1); + let instruction = + Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]); + let timeout = Duration::from_secs(10); + + // Sends a timeout error. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Err(error::MailboxTimeoutSnafu { id }.build()) + }); + + let err = EnterStagingRegion::enter_staging_region( + env.mailbox_ctx.mailbox(), + server_addr, + &peer, + &instruction, + timeout, + ) + .await + .unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_unexpected_instruction_reply() { + let mut env = TestingEnv::new(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + + let server_addr = "localhost"; + let peer = Peer::empty(1); + let instruction = + Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]); + let timeout = Duration::from_secs(10); + + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + // Sends an incorrect reply. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_close_region_reply(id)) + }); + + let err = EnterStagingRegion::enter_staging_region( + env.mailbox_ctx.mailbox(), + server_addr, + &peer, + &instruction, + timeout, + ) + .await + .unwrap_err(); + assert_matches!(err, Error::UnexpectedInstructionReply { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_enter_staging_region_failed_to_enter_staging_state() { + let mut env = TestingEnv::new(); + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + let server_addr = "localhost"; + let peer = Peer::empty(1); + let instruction = + Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]); + let timeout = Duration::from_secs(10); + + // Sends a failed reply. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_enter_staging_region_reply( + id, + RegionId::new(1024, 1), + false, + true, + Some("test mocked".to_string()), + )) + }); + + let err = EnterStagingRegion::enter_staging_region( + env.mailbox_ctx.mailbox(), + server_addr, + &peer, + &instruction, + timeout, + ) + .await + .unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + // Region doesn't exist on the datanode. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_enter_staging_region_reply( + id, + RegionId::new(1024, 1), + false, + false, + None, + )) + }); + + let err = EnterStagingRegion::enter_staging_region( + env.mailbox_ctx.mailbox(), + server_addr, + &peer, + &instruction, + timeout, + ) + .await + .unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + } + + fn test_prepare_result(table_id: u32) -> GroupPrepareResult { + GroupPrepareResult { + source_routes: vec![], + target_routes: vec![ + RegionRoute { + region: Region { + id: RegionId::new(table_id, 1), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(table_id, 2), + ..Default::default() + }, + leader_peer: Some(Peer::empty(2)), + ..Default::default() + }, + ], + central_region: RegionId::new(table_id, 1), + central_region_datanode_id: 1, + } + } + + fn test_targets() -> Vec { + vec![ + RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10), + }, + RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 10, 20), + }, + ] + } + + #[tokio::test] + async fn test_enter_staging_regions_all_successful() { + let mut env = TestingEnv::new(); + let table_id = 1024; + let targets = test_targets(); + let mut persistent_context = new_persistent_context(table_id, vec![], targets); + persistent_context.group_prepare_result = Some(test_prepare_result(table_id)); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_enter_staging_region_reply( + id, + RegionId::new(1024, 1), + true, + true, + None, + )) + }); + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(2), tx) + .await; + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_enter_staging_region_reply( + id, + RegionId::new(1024, 2), + true, + true, + None, + )) + }); + let mut ctx = env.create_context(persistent_context); + EnterStagingRegion + .enter_staging_regions(&mut ctx) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_enter_staging_region_retryable() { + let env = TestingEnv::new(); + let table_id = 1024; + let targets = test_targets(); + let mut persistent_context = new_persistent_context(table_id, vec![], targets); + persistent_context.group_prepare_result = Some(test_prepare_result(table_id)); + let mut ctx = env.create_context(persistent_context); + let err = EnterStagingRegion + .enter_staging_regions(&mut ctx) + .await + .unwrap_err(); + assert_matches!(err, Error::RetryLater { .. }); + assert!(err.is_retryable()); + } + + #[tokio::test] + async fn test_enter_staging_regions_non_retryable() { + let mut env = TestingEnv::new(); + let table_id = 1024; + let targets = test_targets(); + let mut persistent_context = new_persistent_context(table_id, vec![], targets); + persistent_context.group_prepare_result = Some(test_prepare_result(table_id)); + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(1), tx) + .await; + // Sends an incorrect reply. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_close_region_reply(id)) + }); + + let mut ctx = env.create_context(persistent_context.clone()); + // Datanode 1 returns unexpected reply. + // Datanode 2 is unreachable. + let err = EnterStagingRegion + .enter_staging_regions(&mut ctx) + .await + .unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + env.mailbox_ctx + .insert_heartbeat_response_receiver(Channel::Datanode(2), tx) + .await; + // Sends an incorrect reply. + send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| { + Ok(new_close_region_reply(id)) + }); + let mut ctx = env.create_context(persistent_context); + // Datanode 1 returns unexpected reply. + // Datanode 2 returns unexpected reply. + let err = EnterStagingRegion + .enter_staging_regions(&mut ctx) + .await + .unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert!(!err.is_retryable()); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index 5e72ce613c..7890021206 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -97,6 +97,17 @@ impl RepartitionStart { .map(|r| (*r).clone()) }) .collect::>>()?; + for target_region_route in &target_region_routes { + ensure!( + target_region_route.leader_peer.is_some(), + error::UnexpectedSnafu { + violated: format!( + "Leader peer is not set for region: {}", + target_region_route.region.id + ), + } + ); + } let central_region = sources[0].region_id; let central_region_datanode_id = source_region_routes[0] .leader_peer diff --git a/src/meta-srv/src/procedure/repartition/group/utils.rs b/src/meta-srv/src/procedure/repartition/group/utils.rs new file mode 100644 index 0000000000..9a933a735e --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/utils.rs @@ -0,0 +1,88 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use store_api::storage::RegionId; + +use crate::error::{Error, Result}; + +/// Groups the region routes by the leader peer. +/// +/// # Panics +/// +/// Panics if the leader peer is not set for any of the region routes. +pub(crate) fn group_region_routes_by_peer( + region_routes: &[RegionRoute], +) -> HashMap<&Peer, Vec> { + let mut map: HashMap<&Peer, Vec> = HashMap::new(); + for region_route in region_routes { + map.entry(region_route.leader_peer.as_ref().unwrap()) + .or_default() + .push(region_route.region.id); + } + map +} + +/// Returns `true` if all results are successful. +fn all_successful(results: &[Result<()>]) -> bool { + results.iter().all(Result::is_ok) +} + +pub enum HandleMultipleResult<'a> { + AllSuccessful, + AllRetryable(Vec<(usize, &'a Error)>), + PartialRetryable { + retryable_errors: Vec<(usize, &'a Error)>, + non_retryable_errors: Vec<(usize, &'a Error)>, + }, + AllNonRetryable(Vec<(usize, &'a Error)>), +} + +/// Evaluates results from multiple operations and categorizes errors by retryability. +/// +/// If all operations succeed, returns `AllSuccessful`. +/// If all errors are retryable, returns `AllRetryable`. +/// If all errors are non-retryable, returns `AllNonRetryable`. +/// Otherwise, returns `PartialRetryable` with separate collections for retryable and non-retryable errors. +pub(crate) fn handle_multiple_results<'a>(results: &'a [Result<()>]) -> HandleMultipleResult<'a> { + if all_successful(results) { + return HandleMultipleResult::AllSuccessful; + } + + let mut retryable_errors = Vec::new(); + let mut non_retryable_errors = Vec::new(); + for (index, result) in results.iter().enumerate() { + if let Err(error) = result { + if error.is_retryable() { + retryable_errors.push((index, error)); + } else { + non_retryable_errors.push((index, error)); + } + } + } + + match (retryable_errors.is_empty(), non_retryable_errors.is_empty()) { + (true, false) => HandleMultipleResult::AllNonRetryable(non_retryable_errors), + (false, true) => HandleMultipleResult::AllRetryable(retryable_errors), + (false, false) => HandleMultipleResult::PartialRetryable { + retryable_errors, + non_retryable_errors, + }, + // Should not happen, but include for completeness + (true, true) => HandleMultipleResult::AllSuccessful, + } +} diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs index 3c0ebee58a..771fe3d8e4 100644 --- a/src/meta-srv/src/procedure/repartition/test_util.rs +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -32,6 +32,7 @@ use crate::procedure::test_util::MailboxContext; pub struct TestingEnv { pub table_metadata_manager: TableMetadataManagerRef, pub mailbox_ctx: MailboxContext, + pub server_addr: String, } impl Default for TestingEnv { @@ -51,10 +52,11 @@ impl TestingEnv { Self { table_metadata_manager, mailbox_ctx, + server_addr: "localhost".to_string(), } } - pub fn create_context(self, persistent_context: PersistentContext) -> Context { + pub fn create_context(&self, persistent_context: PersistentContext) -> Context { let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( self.mailbox_ctx.mailbox().clone(), MetasrvInfo { @@ -66,6 +68,8 @@ impl TestingEnv { persistent_ctx: persistent_context, table_metadata_manager: self.table_metadata_manager.clone(), cache_invalidator, + mailbox: self.mailbox_ctx.mailbox().clone(), + server_addr: self.server_addr.clone(), } } } diff --git a/src/meta-srv/src/procedure/test_util.rs b/src/meta-srv/src/procedure/test_util.rs index 1586ad5f5f..04cff16ea4 100644 --- a/src/meta-srv/src/procedure/test_util.rs +++ b/src/meta-srv/src/procedure/test_util.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage}; use common_meta::instruction::{ - DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply, - UpgradeRegionReply, UpgradeRegionsReply, + DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply, + FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply, }; use common_meta::key::TableMetadataManagerRef; use common_meta::key::table_route::TableRouteValue; @@ -198,7 +198,7 @@ pub fn new_downgrade_region_reply( } } -/// Generates a [InstructionReply::UpgradeRegion] reply. +/// Generates a [InstructionReply::UpgradeRegions] reply. pub fn new_upgrade_region_reply( id: u64, ready: bool, @@ -225,6 +225,34 @@ pub fn new_upgrade_region_reply( } } +/// Generates a [InstructionReply::EnterStagingRegions] reply. +pub fn new_enter_staging_region_reply( + id: u64, + region_id: RegionId, + ready: bool, + exists: bool, + error: Option, +) -> MailboxMessage { + MailboxMessage { + id, + subject: "mock".to_string(), + from: "datanode".to_string(), + to: "meta".to_string(), + timestamp_millis: current_time_millis(), + payload: Some(Payload::Json( + serde_json::to_string(&InstructionReply::EnterStagingRegions( + EnterStagingRegionsReply::new(vec![EnterStagingRegionReply { + region_id, + ready, + exists, + error, + }]), + )) + .unwrap(), + )), + } +} + /// Mock the test data for WAL pruning. pub async fn new_wal_prune_metadata( table_metadata_manager: TableMetadataManagerRef,