feat(repartition): implement enter staging region state (#7447)

* feat(repartition): implement enter staging region state

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-24 10:50:27 +08:00
committed by GitHub
parent 0cea58c642
commit ee86987912
9 changed files with 1194 additions and 6 deletions

View File

@@ -514,6 +514,22 @@ impl Display for GcRegionsReply {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EnterStagingRegion {
pub region_id: RegionId,
pub partition_expr: String,
}
impl Display for EnterStagingRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"EnterStagingRegion(region_id={}, partition_expr={})",
self.region_id, self.partition_expr
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens regions.
@@ -541,6 +557,8 @@ pub enum Instruction {
GcRegions(GcRegions),
/// Temporary suspend serving reads or writes
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
}
impl Instruction {
@@ -597,6 +615,13 @@ impl Instruction {
_ => None,
}
}
pub fn into_enter_staging_regions(self) -> Option<Vec<EnterStagingRegion>> {
match self {
Self::EnterStagingRegions(enter_staging) => Some(enter_staging),
_ => None,
}
}
}
/// The reply of [UpgradeRegion].
@@ -690,6 +715,28 @@ where
})
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct EnterStagingRegionReply {
pub region_id: RegionId,
/// Returns true if the region is under the new region rule.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct EnterStagingRegionsReply {
pub replies: Vec<EnterStagingRegionReply>,
}
impl EnterStagingRegionsReply {
pub fn new(replies: Vec<EnterStagingRegionReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
@@ -710,6 +757,7 @@ pub enum InstructionReply {
FlushRegions(FlushRegionReply),
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
}
impl Display for InstructionReply {
@@ -726,6 +774,13 @@ impl Display for InstructionReply {
Self::FlushRegions(reply) => write!(f, "InstructionReply::FlushRegions({})", reply),
Self::GetFileRefs(reply) => write!(f, "InstructionReply::GetFileRefs({})", reply),
Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegion({})", reply),
Self::EnterStagingRegions(reply) => {
write!(
f,
"InstructionReply::EnterStagingRegions({:?})",
reply.replies
)
}
}
}
}
@@ -766,6 +821,13 @@ impl InstructionReply {
_ => panic!("Expected FlushRegions reply"),
}
}
pub fn expect_enter_staging_regions_reply(self) -> Vec<EnterStagingRegionReply> {
match self {
Self::EnterStagingRegions(reply) => reply.replies,
_ => panic!("Expected EnterStagingRegion reply"),
}
}
}
#[cfg(test)]

View File

@@ -24,6 +24,7 @@ use store_api::storage::GcReport;
mod close_region;
mod downgrade_region;
mod enter_staging;
mod file_ref;
mod flush_region;
mod gc_worker;
@@ -32,6 +33,7 @@ mod upgrade_region;
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
@@ -123,6 +125,9 @@ impl RegionHeartbeatResponseHandler {
Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
Instruction::Suspend => Ok(None),
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
}
}
}
@@ -136,6 +141,7 @@ pub enum InstructionHandlers {
UpgradeRegions(UpgradeRegionsHandler),
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
}
macro_rules! impl_from_handler {
@@ -157,7 +163,8 @@ impl_from_handler!(
DowngradeRegionsHandler => DowngradeRegions,
UpgradeRegionsHandler => UpgradeRegions,
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions
);
macro_rules! dispatch_instr {
@@ -202,6 +209,7 @@ dispatch_instr!(
UpgradeRegions => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions
);
#[async_trait]
@@ -254,7 +262,9 @@ mod tests {
use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
};
use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
use common_meta::instruction::{
DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
};
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
@@ -335,6 +345,16 @@ mod tests {
region_id,
..Default::default()
}]);
assert!(
heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
);
// Enter staging region
let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
}]);
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
);

View File

@@ -0,0 +1,243 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{
EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
};
use common_telemetry::{error, warn};
use futures::future::join_all;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
#[derive(Debug, Clone, Copy, Default)]
pub struct EnterStagingRegionsHandler;
#[async_trait::async_trait]
impl InstructionHandler for EnterStagingRegionsHandler {
type Instruction = Vec<EnterStagingRegion>;
async fn handle(
&self,
ctx: &HandlerContext,
enter_staging: Self::Instruction,
) -> Option<InstructionReply> {
let futures = enter_staging.into_iter().map(|enter_staging_region| {
Self::handle_enter_staging_region(ctx, enter_staging_region)
});
let results = join_all(futures).await;
Some(InstructionReply::EnterStagingRegions(
EnterStagingRegionsReply::new(results),
))
}
}
impl EnterStagingRegionsHandler {
async fn handle_enter_staging_region(
ctx: &HandlerContext,
EnterStagingRegion {
region_id,
partition_expr,
}: EnterStagingRegion,
) -> EnterStagingRegionReply {
let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return EnterStagingRegionReply {
region_id,
ready: false,
exists: false,
error: None,
};
};
if !writable {
warn!("Region: {} is not writable", region_id);
return EnterStagingRegionReply {
region_id,
ready: false,
exists: true,
error: Some("Region is not writable".into()),
};
}
match ctx
.region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
)
.await
{
Ok(_) => EnterStagingRegionReply {
region_id,
ready: true,
exists: true,
error: None,
},
Err(err) => {
error!(err; "Failed to enter staging region");
EnterStagingRegionReply {
region_id,
ready: false,
exists: true,
error: Some(format!("{err:?}")),
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::instruction::EnterStagingRegion;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::region_server::RegionServer;
use crate::tests::{MockRegionEngine, mock_region_server};
const PARTITION_EXPR: &str = "partition_expr";
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(!reply.ready);
}
#[tokio::test]
async fn test_region_not_writable() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(!reply.ready);
}
async fn prepare_region(region_server: &RegionServer) {
let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
create_req.table_dir = table_dir("test", 1024);
let region_id = RegionId::new(1024, 1);
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
}
#[tokio::test]
async fn test_enter_staging() {
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: PARTITION_EXPR.to_string(),
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.error.is_none());
assert!(reply.ready);
// Should be ok to enter staging mode again with the same partition expr
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: PARTITION_EXPR.to_string(),
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.error.is_none());
assert!(reply.ready);
// Should throw error if try to enter staging mode again with a different partition expr
let replies = EnterStagingRegionsHandler
.handle(
&handler_context,
vec![EnterStagingRegion {
region_id,
partition_expr: "".to_string(),
}],
)
.await
.unwrap();
let replies = replies.expect_enter_staging_regions_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(!reply.ready);
}
}

View File

@@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod enter_staging_region;
pub(crate) mod repartition_start;
pub(crate) mod update_metadata;
pub(crate) mod utils;
use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::DatanodeId;
@@ -34,6 +37,7 @@ use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::service::mailbox::MailboxRef;
pub type GroupId = Uuid;
@@ -45,6 +49,10 @@ pub struct Context {
pub cache_invalidator: CacheInvalidatorRef,
pub table_metadata_manager: TableMetadataManagerRef,
pub mailbox: MailboxRef,
pub server_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -184,6 +192,13 @@ impl Context {
.await
.context(error::TableMetadataManagerSnafu)
}
/// Returns the next operation timeout.
///
/// If the next operation timeout is not set, it will return `None`.
pub fn next_operation_timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(10))
}
}
/// Returns the region routes of the given table route value.

View File

@@ -0,0 +1,717 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{
EnterStagingRegionReply, EnterStagingRegionsReply, Instruction, InstructionReply,
};
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
pub struct EnterStagingRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for EnterStagingRegion {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
self.enter_staging_regions(ctx).await?;
Ok(Self::next_state())
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl EnterStagingRegion {
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(EnterStagingRegion), Status::executing(true))
}
fn build_enter_staging_instructions(
prepare_result: &GroupPrepareResult,
targets: &[RegionDescriptor],
) -> Result<HashMap<Peer, Instruction>> {
let target_partition_expr_by_region = targets
.iter()
.map(|target| {
Ok((
target.region_id,
target
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
// Safety: `leader_peer` is set for all region routes, checked in `repartition_start`.
let target_region_routes_by_peer =
group_region_routes_by_peer(&prepare_result.target_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let enter_staging_regions = region_ids
.into_iter()
.map(|region_id| common_meta::instruction::EnterStagingRegion {
region_id,
// Safety: the target_routes is constructed from the targets, so the region_id is always present in the map.
partition_expr: target_partition_expr_by_region[&region_id].clone(),
})
.collect();
instructions.insert(
peer.clone(),
Instruction::EnterStagingRegions(enter_staging_regions),
);
}
Ok(instructions)
}
#[allow(dead_code)]
async fn enter_staging_regions(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
// Safety: the group prepare result is set in the RepartitionStart state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let instructions = Self::build_enter_staging_instructions(prepare_result, targets)?;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Enter staging regions",
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, instruction)| {
(
peer,
Self::enter_staging_region(
&ctx.mailbox,
&ctx.server_addr,
peer,
instruction,
operation_timeout,
),
)
})
.unzip();
info!(
"Sent enter staging regions instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
);
let format_err_msg = |idx: usize, error: &Error| {
let peer = peers[idx];
format!(
"Failed to enter staging regions on datanode {:?}, error: {:?}",
peer, error
)
};
// Waits for all tasks to complete.
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
reason: format!(
"All retryable errors during entering staging regions for repartition table {}, group id {}: {:?}",
table_id, group_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during entering staging regions for repartition table {}, group id {}: {:?}",
table_id, group_id,
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during entering staging regions for repartition table {}, group id {}: {:?}, non retryable errors: {:?}",
table_id, group_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
),
}
.fail(),
}
}
/// Enter staging region on a datanode.
///
/// Retry:
/// - Pusher is not found.
/// - Mailbox timeout.
///
/// Abort(non-retry):
/// - Unexpected instruction reply.
/// - Exceeded deadline of enter staging regions instruction.
/// - Target region doesn't exist on the datanode.
async fn enter_staging_region(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
instruction: &Instruction,
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let message = MailboxMessage::json_message(
&format!("Enter staging regions: {:?}", instruction),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for enter staging regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received enter staging regions reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::EnterStagingRegions(EnterStagingRegionsReply { replies }) =
reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect enter staging regions reply",
}
.fail();
};
for reply in replies {
Self::handle_enter_staging_region_reply(&reply, &now, peer)?;
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for enter staging regions on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_enter_staging_region_reply(
EnterStagingRegionReply {
region_id,
ready,
exists,
error,
}: &EnterStagingRegionReply,
now: &Instant,
peer: &Peer,
) -> Result<()> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to enter staging region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
region_id, peer, error, now.elapsed()
),
}
.fail();
}
ensure!(
ready,
error::RetryLaterSnafu {
reason: format!(
"Region {} is still entering staging state on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
),
}
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::Instruction;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use crate::error::{self, Error};
use crate::procedure::repartition::group::GroupPrepareResult;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::{
TestingEnv, new_persistent_context, range_expr,
};
use crate::procedure::test_util::{
new_close_region_reply, new_enter_staging_region_reply, send_mock_reply,
};
use crate::service::mailbox::Channel;
#[test]
fn test_build_enter_staging_instructions() {
let table_id = 1024;
let prepare_result = GroupPrepareResult {
source_routes: vec![RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}],
target_routes: vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
..Default::default()
},
leader_peer: Some(Peer::empty(2)),
..Default::default()
},
],
central_region: RegionId::new(table_id, 1),
central_region_datanode_id: 1,
};
let targets = test_targets();
let instructions =
EnterStagingRegion::build_enter_staging_instructions(&prepare_result, &targets)
.unwrap();
assert_eq!(instructions.len(), 2);
let instruction_1 = instructions
.get(&Peer::empty(1))
.unwrap()
.clone()
.into_enter_staging_regions()
.unwrap();
assert_eq!(
instruction_1,
vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(table_id, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]
);
let instruction_2 = instructions
.get(&Peer::empty(2))
.unwrap()
.clone()
.into_enter_staging_regions()
.unwrap();
assert_eq!(
instruction_2,
vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(table_id, 2),
partition_expr: range_expr("x", 10, 20).as_json_str().unwrap(),
}]
);
}
#[tokio::test]
async fn test_datanode_is_unreachable() {
let env = TestingEnv::new();
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let timeout = Duration::from_secs(10);
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
timeout,
)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_enter_staging_region_exceeded_deadline() {
let mut env = TestingEnv::new();
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let timeout = Duration::from_secs(10);
// Sends a timeout error.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Err(error::MailboxTimeoutSnafu { id }.build())
});
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
timeout,
)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_unexpected_instruction_reply() {
let mut env = TestingEnv::new();
let (tx, rx) = tokio::sync::mpsc::channel(1);
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let timeout = Duration::from_secs(10);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_close_region_reply(id))
});
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
timeout,
)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedInstructionReply { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_enter_staging_region_failed_to_enter_staging_state() {
let mut env = TestingEnv::new();
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let timeout = Duration::from_secs(10);
// Sends a failed reply.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_enter_staging_region_reply(
id,
RegionId::new(1024, 1),
false,
true,
Some("test mocked".to_string()),
))
});
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
timeout,
)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
// Region doesn't exist on the datanode.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_enter_staging_region_reply(
id,
RegionId::new(1024, 1),
false,
false,
None,
))
});
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
timeout,
)
.await
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable());
}
fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
GroupPrepareResult {
source_routes: vec![],
target_routes: vec![
RegionRoute {
region: Region {
id: RegionId::new(table_id, 1),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(table_id, 2),
..Default::default()
},
leader_peer: Some(Peer::empty(2)),
..Default::default()
},
],
central_region: RegionId::new(table_id, 1),
central_region_datanode_id: 1,
}
}
fn test_targets() -> Vec<RegionDescriptor> {
vec![
RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10),
},
RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 10, 20),
},
]
}
#[tokio::test]
async fn test_enter_staging_regions_all_successful() {
let mut env = TestingEnv::new();
let table_id = 1024;
let targets = test_targets();
let mut persistent_context = new_persistent_context(table_id, vec![], targets);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_enter_staging_region_reply(
id,
RegionId::new(1024, 1),
true,
true,
None,
))
});
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(2), tx)
.await;
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_enter_staging_region_reply(
id,
RegionId::new(1024, 2),
true,
true,
None,
))
});
let mut ctx = env.create_context(persistent_context);
EnterStagingRegion
.enter_staging_regions(&mut ctx)
.await
.unwrap();
}
#[tokio::test]
async fn test_enter_staging_region_retryable() {
let env = TestingEnv::new();
let table_id = 1024;
let targets = test_targets();
let mut persistent_context = new_persistent_context(table_id, vec![], targets);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let mut ctx = env.create_context(persistent_context);
let err = EnterStagingRegion
.enter_staging_regions(&mut ctx)
.await
.unwrap_err();
assert_matches!(err, Error::RetryLater { .. });
assert!(err.is_retryable());
}
#[tokio::test]
async fn test_enter_staging_regions_non_retryable() {
let mut env = TestingEnv::new();
let table_id = 1024;
let targets = test_targets();
let mut persistent_context = new_persistent_context(table_id, vec![], targets);
persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_close_region_reply(id))
});
let mut ctx = env.create_context(persistent_context.clone());
// Datanode 1 returns unexpected reply.
// Datanode 2 is unreachable.
let err = EnterStagingRegion
.enter_staging_regions(&mut ctx)
.await
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable());
let (tx, rx) = tokio::sync::mpsc::channel(1);
env.mailbox_ctx
.insert_heartbeat_response_receiver(Channel::Datanode(2), tx)
.await;
// Sends an incorrect reply.
send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
Ok(new_close_region_reply(id))
});
let mut ctx = env.create_context(persistent_context);
// Datanode 1 returns unexpected reply.
// Datanode 2 returns unexpected reply.
let err = EnterStagingRegion
.enter_staging_regions(&mut ctx)
.await
.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert!(!err.is_retryable());
}
}

View File

@@ -97,6 +97,17 @@ impl RepartitionStart {
.map(|r| (*r).clone())
})
.collect::<Result<Vec<_>>>()?;
for target_region_route in &target_region_routes {
ensure!(
target_region_route.leader_peer.is_some(),
error::UnexpectedSnafu {
violated: format!(
"Leader peer is not set for region: {}",
target_region_route.region.id
),
}
);
}
let central_region = sources[0].region_id;
let central_region_datanode_id = source_region_routes[0]
.leader_peer

View File

@@ -0,0 +1,88 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use store_api::storage::RegionId;
use crate::error::{Error, Result};
/// Groups the region routes by the leader peer.
///
/// # Panics
///
/// Panics if the leader peer is not set for any of the region routes.
pub(crate) fn group_region_routes_by_peer(
region_routes: &[RegionRoute],
) -> HashMap<&Peer, Vec<RegionId>> {
let mut map: HashMap<&Peer, Vec<RegionId>> = HashMap::new();
for region_route in region_routes {
map.entry(region_route.leader_peer.as_ref().unwrap())
.or_default()
.push(region_route.region.id);
}
map
}
/// Returns `true` if all results are successful.
fn all_successful(results: &[Result<()>]) -> bool {
results.iter().all(Result::is_ok)
}
pub enum HandleMultipleResult<'a> {
AllSuccessful,
AllRetryable(Vec<(usize, &'a Error)>),
PartialRetryable {
retryable_errors: Vec<(usize, &'a Error)>,
non_retryable_errors: Vec<(usize, &'a Error)>,
},
AllNonRetryable(Vec<(usize, &'a Error)>),
}
/// Evaluates results from multiple operations and categorizes errors by retryability.
///
/// If all operations succeed, returns `AllSuccessful`.
/// If all errors are retryable, returns `AllRetryable`.
/// If all errors are non-retryable, returns `AllNonRetryable`.
/// Otherwise, returns `PartialRetryable` with separate collections for retryable and non-retryable errors.
pub(crate) fn handle_multiple_results<'a>(results: &'a [Result<()>]) -> HandleMultipleResult<'a> {
if all_successful(results) {
return HandleMultipleResult::AllSuccessful;
}
let mut retryable_errors = Vec::new();
let mut non_retryable_errors = Vec::new();
for (index, result) in results.iter().enumerate() {
if let Err(error) = result {
if error.is_retryable() {
retryable_errors.push((index, error));
} else {
non_retryable_errors.push((index, error));
}
}
}
match (retryable_errors.is_empty(), non_retryable_errors.is_empty()) {
(true, false) => HandleMultipleResult::AllNonRetryable(non_retryable_errors),
(false, true) => HandleMultipleResult::AllRetryable(retryable_errors),
(false, false) => HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
},
// Should not happen, but include for completeness
(true, true) => HandleMultipleResult::AllSuccessful,
}
}

View File

@@ -32,6 +32,7 @@ use crate::procedure::test_util::MailboxContext;
pub struct TestingEnv {
pub table_metadata_manager: TableMetadataManagerRef,
pub mailbox_ctx: MailboxContext,
pub server_addr: String,
}
impl Default for TestingEnv {
@@ -51,10 +52,11 @@ impl TestingEnv {
Self {
table_metadata_manager,
mailbox_ctx,
server_addr: "localhost".to_string(),
}
}
pub fn create_context(self, persistent_context: PersistentContext) -> Context {
pub fn create_context(&self, persistent_context: PersistentContext) -> Context {
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox().clone(),
MetasrvInfo {
@@ -66,6 +68,8 @@ impl TestingEnv {
persistent_ctx: persistent_context,
table_metadata_manager: self.table_metadata_manager.clone(),
cache_invalidator,
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.clone(),
}
}
}

View File

@@ -17,8 +17,8 @@ use std::collections::HashMap;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage};
use common_meta::instruction::{
DowngradeRegionReply, DowngradeRegionsReply, FlushRegionReply, InstructionReply, SimpleReply,
UpgradeRegionReply, UpgradeRegionsReply,
DowngradeRegionReply, DowngradeRegionsReply, EnterStagingRegionReply, EnterStagingRegionsReply,
FlushRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, UpgradeRegionsReply,
};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_route::TableRouteValue;
@@ -198,7 +198,7 @@ pub fn new_downgrade_region_reply(
}
}
/// Generates a [InstructionReply::UpgradeRegion] reply.
/// Generates a [InstructionReply::UpgradeRegions] reply.
pub fn new_upgrade_region_reply(
id: u64,
ready: bool,
@@ -225,6 +225,34 @@ pub fn new_upgrade_region_reply(
}
}
/// Generates a [InstructionReply::EnterStagingRegions] reply.
pub fn new_enter_staging_region_reply(
id: u64,
region_id: RegionId,
ready: bool,
exists: bool,
error: Option<String>,
) -> MailboxMessage {
MailboxMessage {
id,
subject: "mock".to_string(),
from: "datanode".to_string(),
to: "meta".to_string(),
timestamp_millis: current_time_millis(),
payload: Some(Payload::Json(
serde_json::to_string(&InstructionReply::EnterStagingRegions(
EnterStagingRegionsReply::new(vec![EnterStagingRegionReply {
region_id,
ready,
exists,
error,
}]),
))
.unwrap(),
)),
}
}
/// Mock the test data for WAL pruning.
pub async fn new_wal_prune_metadata(
table_metadata_manager: TableMetadataManagerRef,