feat: implement RemapManifest and ApplyStagingManifest for repartition procedure (#7509)

* feat: add RemapManifest and ApplyStagingManifest heartbeat handler

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: add `RemapManifest` and `ApplyStagingManifest` states for repartition

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-05 16:33:44 +08:00
committed by GitHub
parent 527a1c03f3
commit 2d756b24c8
19 changed files with 1459 additions and 91 deletions

1
Cargo.lock generated
View File

@@ -4062,6 +4062,7 @@ dependencies = [
"mito2",
"num_cpus",
"object-store",
"partition",
"prometheus",
"prost 0.13.5",
"query",

View File

@@ -530,6 +530,49 @@ impl Display for EnterStagingRegion {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RemapManifest {
pub region_id: RegionId,
/// Regions to remap manifests from.
pub input_regions: Vec<RegionId>,
/// For each old region, which new regions should receive its files
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// New partition expressions for the new regions.
pub new_partition_exprs: HashMap<RegionId, String>,
}
impl Display for RemapManifest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})",
self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApplyStagingManifest {
/// The region ID to apply the staging manifest to.
pub region_id: RegionId,
/// The partition expression of the staging region.
pub partition_expr: String,
/// The region that stores the staging manifests in its staging blob storage.
pub central_region_id: RegionId,
/// The relative path to the staging manifest within the central region's staging blob storage.
pub manifest_path: String,
}
impl Display for ApplyStagingManifest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})",
self.region_id, self.partition_expr, self.central_region_id, self.manifest_path
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens regions.
@@ -559,6 +602,10 @@ pub enum Instruction {
Suspend,
/// Makes regions enter staging state.
EnterStagingRegions(Vec<EnterStagingRegion>),
/// Remaps manifests for a region.
RemapManifest(RemapManifest),
/// Applies staging manifests for a region.
ApplyStagingManifests(Vec<ApplyStagingManifest>),
}
impl Instruction {
@@ -737,6 +784,48 @@ impl EnterStagingRegionsReply {
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct RemapManifestReply {
/// Returns false if the region does not exist.
pub exists: bool,
/// A map from region IDs to their corresponding remapped manifest paths.
pub manifest_paths: HashMap<RegionId, String>,
/// Return error if any during the operation.
pub error: Option<String>,
}
impl Display for RemapManifestReply {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RemapManifestReply(manifest_paths={:?}, error={:?})",
self.manifest_paths, self.error
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ApplyStagingManifestsReply {
pub replies: Vec<ApplyStagingManifestReply>,
}
impl ApplyStagingManifestsReply {
pub fn new(replies: Vec<ApplyStagingManifestReply>) -> Self {
Self { replies }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ApplyStagingManifestReply {
pub region_id: RegionId,
/// Returns true if the region is ready to serve reads and writes.
pub ready: bool,
/// Indicates whether the region exists.
pub exists: bool,
/// Return error if any during the operation.
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InstructionReply {
@@ -758,6 +847,8 @@ pub enum InstructionReply {
GetFileRefs(GetFileRefsReply),
GcRegions(GcRegionsReply),
EnterStagingRegions(EnterStagingRegionsReply),
RemapManifest(RemapManifestReply),
ApplyStagingManifests(ApplyStagingManifestsReply),
}
impl Display for InstructionReply {
@@ -781,6 +872,12 @@ impl Display for InstructionReply {
reply.replies
)
}
Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply),
Self::ApplyStagingManifests(reply) => write!(
f,
"InstructionReply::ApplyStagingManifests({:?})",
reply.replies
),
}
}
}
@@ -828,6 +925,20 @@ impl InstructionReply {
_ => panic!("Expected EnterStagingRegion reply"),
}
}
pub fn expect_remap_manifest_reply(self) -> RemapManifestReply {
match self {
Self::RemapManifest(reply) => reply,
_ => panic!("Expected RemapManifest reply"),
}
}
pub fn expect_apply_staging_manifests_reply(self) -> Vec<ApplyStagingManifestReply> {
match self {
Self::ApplyStagingManifests(reply) => reply.replies,
_ => panic!("Expected ApplyStagingManifest reply"),
}
}
}
#[cfg(test)]

View File

@@ -77,4 +77,5 @@ common-query.workspace = true
common-test-util.workspace = true
datafusion-common.workspace = true
mito2 = { workspace = true, features = ["test"] }
partition.workspace = true
session.workspace = true

View File

@@ -201,6 +201,7 @@ pub enum Error {
ShutdownServer {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: servers::error::Error,
},
@@ -208,6 +209,7 @@ pub enum Error {
ShutdownInstance {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: BoxedError,
},

View File

@@ -22,6 +22,7 @@ use common_telemetry::error;
use snafu::OptionExt;
use store_api::storage::GcReport;
mod apply_staging_manifest;
mod close_region;
mod downgrade_region;
mod enter_staging;
@@ -29,8 +30,10 @@ mod file_ref;
mod flush_region;
mod gc_worker;
mod open_region;
mod remap_manifest;
mod upgrade_region;
use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
use crate::heartbeat::handler::close_region::CloseRegionsHandler;
use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
@@ -38,6 +41,7 @@ use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
use crate::heartbeat::handler::open_region::OpenRegionsHandler;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
use crate::heartbeat::task_tracker::TaskTracker;
use crate::region_server::RegionServer;
@@ -128,6 +132,10 @@ impl RegionHeartbeatResponseHandler {
Instruction::EnterStagingRegions(_) => {
Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
}
Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
Instruction::ApplyStagingManifests(_) => {
Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
}
}
}
}
@@ -142,6 +150,8 @@ pub enum InstructionHandlers {
GetFileRefs(GetFileRefsHandler),
GcRegions(GcRegionsHandler),
EnterStagingRegions(EnterStagingRegionsHandler),
RemapManifest(RemapManifestHandler),
ApplyStagingManifests(ApplyStagingManifestsHandler),
}
macro_rules! impl_from_handler {
@@ -164,7 +174,9 @@ impl_from_handler!(
UpgradeRegionsHandler => UpgradeRegions,
GetFileRefsHandler => GetFileRefs,
GcRegionsHandler => GcRegions,
EnterStagingRegionsHandler => EnterStagingRegions
EnterStagingRegionsHandler => EnterStagingRegions,
RemapManifestHandler => RemapManifest,
ApplyStagingManifestsHandler => ApplyStagingManifests
);
macro_rules! dispatch_instr {
@@ -209,7 +221,9 @@ dispatch_instr!(
UpgradeRegions => UpgradeRegions,
GetFileRefs => GetFileRefs,
GcRegions => GcRegions,
EnterStagingRegions => EnterStagingRegions
EnterStagingRegions => EnterStagingRegions,
RemapManifest => RemapManifest,
ApplyStagingManifests => ApplyStagingManifests,
);
#[async_trait]

View File

@@ -0,0 +1,287 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{
ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply,
};
use common_telemetry::{error, warn};
use futures::future::join_all;
use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest};
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct ApplyStagingManifestsHandler;
#[async_trait::async_trait]
impl InstructionHandler for ApplyStagingManifestsHandler {
type Instruction = Vec<ApplyStagingManifest>;
async fn handle(
&self,
ctx: &HandlerContext,
requests: Self::Instruction,
) -> Option<InstructionReply> {
let results = join_all(
requests
.into_iter()
.map(|request| Self::handle_apply_staging_manifest(ctx, request)),
)
.await;
Some(InstructionReply::ApplyStagingManifests(
ApplyStagingManifestsReply::new(results),
))
}
}
impl ApplyStagingManifestsHandler {
async fn handle_apply_staging_manifest(
ctx: &HandlerContext,
request: ApplyStagingManifest,
) -> ApplyStagingManifestReply {
let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else {
warn!("Region: {} is not found", request.region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
exists: false,
ready: false,
error: None,
};
};
if !leader {
warn!("Region: {} is not leader", request.region_id);
return ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: false,
error: Some("Region is not leader".into()),
};
}
match ctx
.region_server
.handle_request(
request.region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: request.partition_expr,
central_region_id: request.central_region_id,
manifest_path: request.manifest_path,
}),
)
.await
{
Ok(_) => ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: true,
error: None,
},
Err(err) => {
error!(err; "Failed to apply staging manifest");
ApplyStagingManifestReply {
region_id: request.region_id,
exists: true,
ready: false,
error: Some(format!("{err:?}")),
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::EnterStagingRequest;
use store_api::storage::RegionId;
use super::*;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::region_server::RegionServer;
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: "".to_string(),
central_region_id: RegionId::new(1024, 9999), // use a dummy value
manifest_path: "".to_string(),
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(!reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_none());
}
#[tokio::test]
async fn test_region_not_leader() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: "".to_string(),
central_region_id: RegionId::new(1024, 2),
manifest_path: "".to_string(),
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_some());
}
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
async fn prepare_region(region_server: &RegionServer) {
let region_specs = [
(RegionId::new(1024, 1), range_expr("x", 0, 49)),
(RegionId::new(1024, 2), range_expr("x", 49, 100)),
];
for (region_id, partition_expr) in region_specs {
let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
create_req.table_dir = table_dir("test", 1024);
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
}),
)
.await
.unwrap();
}
}
#[tokio::test]
async fn test_apply_staging_manifest() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id, region_id2],
region_mapping: HashMap::from([
// [0,49) <- [0, 50)
(region_id, vec![region_id]),
// [49, 100) <- [0, 50), [50,100)
(region_id2, vec![region_id, region_id2]),
]),
new_partition_exprs: HashMap::from([
(region_id, range_expr("x", 0, 49).as_json_str().unwrap()),
(region_id2, range_expr("x", 49, 100).as_json_str().unwrap()),
]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
assert_eq!(reply.manifest_paths.len(), 2);
let manifest_path_1 = reply.manifest_paths[&region_id].clone();
let manifest_path_2 = reply.manifest_paths[&region_id2].clone();
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id,
partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: manifest_path_1,
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(reply.ready);
assert!(reply.error.is_none());
// partition expr mismatch
let reply = ApplyStagingManifestsHandler
.handle(
&handler_context,
vec![ApplyStagingManifest {
region_id: region_id2,
partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
central_region_id: region_id,
manifest_path: manifest_path_2,
}],
)
.await
.unwrap();
let replies = reply.expect_apply_staging_manifests_reply();
let reply = &replies[0];
assert!(reply.exists);
assert!(!reply.ready);
assert!(reply.error.is_some());
}
}

View File

@@ -0,0 +1,246 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
use common_telemetry::warn;
use store_api::region_engine::RemapManifestsRequest;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
pub struct RemapManifestHandler;
#[async_trait::async_trait]
impl InstructionHandler for RemapManifestHandler {
type Instruction = RemapManifest;
async fn handle(
&self,
ctx: &HandlerContext,
request: Self::Instruction,
) -> Option<InstructionReply> {
let RemapManifest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
} = request;
let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
warn!("Region: {} is not found", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
exists: false,
manifest_paths: Default::default(),
error: None,
}));
};
if !leader {
warn!("Region: {} is not leader", region_id);
return Some(InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some("Region is not leader".into()),
}));
}
let reply = match ctx
.region_server
.remap_manifests(RemapManifestsRequest {
region_id,
input_regions,
region_mapping,
new_partition_exprs,
})
.await
{
Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: result.manifest_paths,
error: None,
}),
Err(e) => InstructionReply::RemapManifest(RemapManifestReply {
exists: true,
manifest_paths: Default::default(),
error: Some(format!("{e:?}")),
}),
};
Some(reply)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::instruction::RemapManifest;
use datatypes::value::Value;
use mito2::config::MitoConfig;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use partition::expr::{PartitionExpr, col};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
use store_api::region_request::{EnterStagingRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
use crate::region_server::RegionServer;
use crate::tests::{MockRegionEngine, mock_region_server};
#[tokio::test]
async fn test_region_not_exist() {
let mut mock_region_server = mock_region_server();
let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
mock_region_server.register_engine(mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let region_id = RegionId::new(1024, 1);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![],
region_mapping: HashMap::new(),
new_partition_exprs: HashMap::new(),
},
)
.await
.unwrap();
let reply = &reply.expect_remap_manifest_reply();
assert!(!reply.exists);
assert!(reply.error.is_none());
assert!(reply.manifest_paths.is_empty());
}
#[tokio::test]
async fn test_region_not_leader() {
let mock_region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let (mock_engine, _) =
MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
region_engine.mock_role = Some(Some(RegionRole::Follower));
region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
});
mock_region_server.register_test_region(region_id, mock_engine);
let handler_context = HandlerContext::new_for_test(mock_region_server);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![],
region_mapping: HashMap::new(),
new_partition_exprs: HashMap::new(),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_some());
}
fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
async fn prepare_region(region_server: &RegionServer) {
let region_specs = [
(RegionId::new(1024, 1), range_expr("x", 0, 50)),
(RegionId::new(1024, 2), range_expr("x", 50, 100)),
];
for (region_id, partition_expr) in region_specs {
let builder = CreateRequestBuilder::new();
let mut create_req = builder.build();
create_req.table_dir = table_dir("test", 1024);
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.await
.unwrap();
region_server
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: partition_expr.as_json_str().unwrap(),
}),
)
.await
.unwrap();
}
}
#[tokio::test]
async fn test_remap_manifest() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let region_id = RegionId::new(1024, 1);
let mut engine_env = TestEnv::new().await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine.clone()));
prepare_region(&region_server).await;
let handler_context = HandlerContext::new_for_test(region_server);
let region_id2 = RegionId::new(1024, 2);
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id, region_id2],
region_mapping: HashMap::from([
(region_id, vec![region_id]),
(region_id2, vec![region_id]),
]),
new_partition_exprs: HashMap::from([(
region_id,
range_expr("x", 0, 100).as_json_str().unwrap(),
)]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_none(), "{}", reply.error.unwrap());
assert_eq!(reply.manifest_paths.len(), 1);
// Remap failed
let reply = RemapManifestHandler
.handle(
&handler_context,
RemapManifest {
region_id,
input_regions: vec![region_id],
region_mapping: HashMap::from([
(region_id, vec![region_id]),
(region_id2, vec![region_id]),
]),
new_partition_exprs: HashMap::from([(
region_id,
range_expr("x", 0, 100).as_json_str().unwrap(),
)]),
},
)
.await
.unwrap();
let reply = reply.expect_remap_manifest_reply();
assert!(reply.exists);
assert!(reply.error.is_some());
assert!(reply.manifest_paths.is_empty());
}
}

View File

@@ -65,8 +65,9 @@ use store_api::metric_engine_consts::{
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
};
use store_api::region_engine::{
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState, SyncRegionFromRequest,
RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest,
RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
SyncRegionFromRequest,
};
use store_api::region_request::{
AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest,
@@ -604,6 +605,25 @@ impl RegionServer {
.await
}
/// Remaps manifests from old regions to new regions.
pub async fn remap_manifests(
&self,
request: RemapManifestsRequest,
) -> Result<RemapManifestsResponse> {
let region_id = request.region_id;
let engine_with_status = self
.inner
.region_map
.get(&region_id)
.with_context(|| RegionNotFoundSnafu { region_id })?;
engine_with_status
.engine()
.remap_manifests(request)
.await
.with_context(|_| HandleRegionRequestSnafu { region_id })
}
fn is_suspended(&self) -> bool {
self.suspend.load(Ordering::Relaxed)
}

View File

@@ -13,18 +13,41 @@
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::procedure::repartition::collect::{Collect, ProcedureMeta};
use crate::procedure::repartition::group::RepartitionGroupProcedure;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::{self, Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dispatch;
#[allow(dead_code)]
fn build_region_mapping(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
transition_map: &[Vec<usize>],
) -> HashMap<RegionId, Vec<RegionId>> {
transition_map
.iter()
.enumerate()
.map(|(source_idx, indices)| {
let source_region = source_regions[source_idx].region_id;
let target_regions = indices
.iter()
.map(|&target_idx| target_regions[target_idx].region_id)
.collect::<Vec<_>>();
(source_region, target_regions)
})
.collect::<HashMap<RegionId, _>>()
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for Dispatch {
@@ -37,11 +60,19 @@ impl State for Dispatch {
let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len());
let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len());
for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() {
let region_mapping = build_region_mapping(
&plan.source_regions,
&plan.target_regions,
&plan.transition_map,
);
let persistent_ctx = repartition::group::PersistentContext::new(
plan.group_id,
table_id,
ctx.persistent_ctx.catalog_name.clone(),
ctx.persistent_ctx.schema_name.clone(),
plan.source_regions.clone(),
plan.target_regions.clone(),
region_mapping,
);
let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx);

View File

@@ -12,27 +12,34 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod apply_staging_manifest;
pub(crate) mod enter_staging_region;
pub(crate) mod remap_manifest;
pub(crate) mod repartition_end;
pub(crate) mod repartition_start;
pub(crate) mod update_metadata;
pub(crate) mod utils;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::DatanodeId;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
UserMetadata,
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
@@ -71,6 +78,12 @@ impl RepartitionGroupProcedure {
}
}
#[derive(Debug, Serialize)]
pub struct RepartitionGroupData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
}
#[async_trait::async_trait]
impl Procedure for RepartitionGroupProcedure {
fn type_name(&self) -> &str {
@@ -78,27 +91,48 @@ impl Procedure for RepartitionGroupProcedure {
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
todo!()
}
let state = &mut self.state;
async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> {
todo!()
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retryable() {
Err(ProcedureError::retry_later(e))
} else {
error!(
e;
"Repartition group procedure failed, group id: {}, table id: {}",
self.context.persistent_ctx.group_id,
self.context.persistent_ctx.table_id,
);
Err(ProcedureError::external(e))
}
}
}
}
fn rollback_supported(&self) -> bool {
true
false
}
fn dump(&self) -> ProcedureResult<String> {
todo!()
let data = RepartitionGroupData {
persistent_ctx: &self.context.persistent_ctx,
state: self.state.as_ref(),
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
todo!()
LockKey::new(self.context.persistent_ctx.lock_key())
}
fn user_metadata(&self) -> Option<UserMetadata> {
todo!()
// TODO(weny): support user metadata.
None
}
}
@@ -123,8 +157,8 @@ pub struct GroupPrepareResult {
pub target_routes: Vec<RegionRoute>,
/// The primary source region id (first source region), used for retrieving region options.
pub central_region: RegionId,
/// The datanode id where the primary source region is located.
pub central_region_datanode_id: DatanodeId,
/// The peer where the primary source region is located.
pub central_region_datanode: Peer,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -132,30 +166,59 @@ pub struct PersistentContext {
pub group_id: GroupId,
/// The table id of the repartition group.
pub table_id: TableId,
/// The catalog name of the repartition group.
pub catalog_name: String,
/// The schema name of the repartition group.
pub schema_name: String,
/// The source regions of the repartition group.
pub sources: Vec<RegionDescriptor>,
/// The target regions of the repartition group.
pub targets: Vec<RegionDescriptor>,
/// For each `source region`, the corresponding
/// `target regions` that overlap with it.
pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
/// The result of group prepare.
/// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
pub group_prepare_result: Option<GroupPrepareResult>,
/// The staging manifest paths of the repartition group.
/// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
pub staging_manifest_paths: HashMap<RegionId, String>,
}
impl PersistentContext {
pub fn new(
group_id: GroupId,
table_id: TableId,
catalog_name: String,
schema_name: String,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
region_mapping: HashMap<RegionId, Vec<RegionId>>,
) -> Self {
Self {
group_id,
table_id,
catalog_name,
schema_name,
sources,
targets,
region_mapping,
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
}
}
pub fn lock_key(&self) -> Vec<StringKey> {
let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
lock_keys.extend([
CatalogLock::Read(&self.catalog_name).into(),
SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
]);
for source in &self.sources {
lock_keys.push(RegionLock::Write(source.region_id).into());
}
lock_keys
}
}
impl Context {
@@ -253,7 +316,7 @@ impl Context {
// Safety: prepare result is set in [RepartitionStart] state.
let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
let central_region_datanode_table_value = self
.get_datanode_table_value(table_id, prepare_result.central_region_datanode_id)
.get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
.await?;
let RegionInfo {
region_options,

View File

@@ -0,0 +1,333 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{
ApplyStagingManifestReply, ApplyStagingManifestsReply, Instruction, InstructionReply,
};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
pub struct ApplyStagingManifest;
#[async_trait::async_trait]
#[typetag::serde]
impl State for ApplyStagingManifest {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
self.apply_staging_manifests(ctx).await?;
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ApplyStagingManifest {
fn build_apply_staging_manifest_instructions(
staging_manifest_paths: &HashMap<RegionId, String>,
target_routes: &[RegionRoute],
targets: &[RegionDescriptor],
central_region_id: RegionId,
) -> Result<HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>> {
let target_partition_expr_by_region = targets
.iter()
.map(|target| {
Ok((
target.region_id,
target
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;
// Safety: `leader_peer` is set for all region routes, checked in `repartition_start`.
let target_region_routes_by_peer = group_region_routes_by_peer(target_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let apply_staging_manifests = region_ids
.into_iter()
.map(|region_id| common_meta::instruction::ApplyStagingManifest {
region_id,
partition_expr: target_partition_expr_by_region[&region_id].clone(),
central_region_id,
manifest_path: staging_manifest_paths[&region_id].clone(),
})
.collect();
instructions.insert(peer.clone(), apply_staging_manifests);
}
Ok(instructions)
}
#[allow(dead_code)]
async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> {
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let staging_manifest_paths = &ctx.persistent_ctx.staging_manifest_paths;
// Safety: the group prepare result is set in the RepartitionStart state.
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let targets = &ctx.persistent_ctx.targets;
let target_routes = &prepare_result.target_routes;
let central_region_id = prepare_result.central_region;
let instructions = Self::build_apply_staging_manifest_instructions(
staging_manifest_paths,
target_routes,
targets,
central_region_id,
)?;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Apply staging manifests",
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, apply_staging_manifests)| {
(
peer,
Self::apply_staging_manifest(
&ctx.mailbox,
&ctx.server_addr,
peer,
apply_staging_manifests,
operation_timeout,
),
)
})
.unzip();
info!(
"Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}",
peers, table_id, group_id
);
let format_err_msg = |idx: usize, error: &Error| {
let peer = peers[idx];
format!(
"Failed to apply staging manifests on datanode {:?}, error: {:?}",
peer, error
)
};
// Waits for all tasks to complete.
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
reason: format!(
"All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
table_id, group_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
table_id, group_id,
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(",")
),
}
.fail(),
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}",
table_id, group_id,
retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
non_retryable_errors
.iter()
.map(|(idx, error)| format_err_msg(*idx, error))
.collect::<Vec<_>>()
.join(","),
),
}
.fail(),
}
}
async fn apply_staging_manifest(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
apply_staging_manifests: &[common_meta::instruction::ApplyStagingManifest],
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::ApplyStagingManifests(apply_staging_manifests.to_vec());
let message = MailboxMessage::json_message(
&format!(
"Apply staging manifests for regions: {:?}",
apply_staging_manifests
.iter()
.map(|r| r.region_id)
.collect::<Vec<_>>()
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for apply staging manifests on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received apply staging manifests reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) =
reply
else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect apply staging manifests reply",
}
.fail();
};
for reply in replies {
Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?;
}
Ok(())
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for apply staging manifests on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_apply_staging_manifest_reply(
ApplyStagingManifestReply {
region_id,
ready,
exists,
error,
}: &ApplyStagingManifestReply,
now: &Instant,
peer: &Peer,
) -> Result<()> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to apply staging manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
peer,
error,
now.elapsed()
),
}
.fail();
}
ensure!(
ready,
error::RetryLaterSnafu {
reason: format!(
"Region {} is still applying staging manifest on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
),
}
);
Ok(())
}
}

View File

@@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::remap_manifest::RemapManifest;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
@@ -49,7 +50,7 @@ impl State for EnterStagingRegion {
) -> Result<(Box<dyn State>, Status)> {
self.enter_staging_regions(ctx).await?;
Ok(Self::next_state())
Ok((Box::new(RemapManifest), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
@@ -58,16 +59,10 @@ impl State for EnterStagingRegion {
}
impl EnterStagingRegion {
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(EnterStagingRegion), Status::executing(true))
}
fn build_enter_staging_instructions(
prepare_result: &GroupPrepareResult,
targets: &[RegionDescriptor],
) -> Result<HashMap<Peer, Instruction>> {
) -> Result<HashMap<Peer, Vec<common_meta::instruction::EnterStagingRegion>>> {
let target_partition_expr_by_region = targets
.iter()
.map(|target| {
@@ -93,10 +88,7 @@ impl EnterStagingRegion {
partition_expr: target_partition_expr_by_region[&region_id].clone(),
})
.collect();
instructions.insert(
peer.clone(),
Instruction::EnterStagingRegions(enter_staging_regions),
);
instructions.insert(peer.clone(), enter_staging_regions);
}
Ok(instructions)
@@ -117,14 +109,14 @@ impl EnterStagingRegion {
})?;
let (peers, tasks): (Vec<_>, Vec<_>) = instructions
.iter()
.map(|(peer, instruction)| {
.map(|(peer, enter_staging_regions)| {
(
peer,
Self::enter_staging_region(
&ctx.mailbox,
&ctx.server_addr,
peer,
instruction,
enter_staging_regions,
operation_timeout,
),
)
@@ -208,12 +200,19 @@ impl EnterStagingRegion {
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
instruction: &Instruction,
enter_staging_regions: &[common_meta::instruction::EnterStagingRegion],
timeout: Duration,
) -> Result<()> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::EnterStagingRegions(enter_staging_regions.to_vec());
let message = MailboxMessage::json_message(
&format!("Enter staging regions: {:?}", instruction),
&format!(
"Enter staging regions: {:?}",
enter_staging_regions
.iter()
.map(|r| r.region_id)
.collect::<Vec<_>>()
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
@@ -328,7 +327,6 @@ mod tests {
use std::assert_matches::assert_matches;
use std::time::Duration;
use common_meta::instruction::Instruction;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
@@ -376,7 +374,7 @@ mod tests {
},
],
central_region: RegionId::new(table_id, 1),
central_region_datanode_id: 1,
central_region_datanode: Peer::empty(1),
};
let targets = test_targets();
let instructions =
@@ -384,12 +382,7 @@ mod tests {
.unwrap();
assert_eq!(instructions.len(), 2);
let instruction_1 = instructions
.get(&Peer::empty(1))
.unwrap()
.clone()
.into_enter_staging_regions()
.unwrap();
let instruction_1 = instructions.get(&Peer::empty(1)).unwrap().clone();
assert_eq!(
instruction_1,
vec![common_meta::instruction::EnterStagingRegion {
@@ -397,12 +390,7 @@ mod tests {
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]
);
let instruction_2 = instructions
.get(&Peer::empty(2))
.unwrap()
.clone()
.into_enter_staging_regions()
.unwrap();
let instruction_2 = instructions.get(&Peer::empty(2)).unwrap().clone();
assert_eq!(
instruction_2,
vec![common_meta::instruction::EnterStagingRegion {
@@ -417,18 +405,17 @@ mod tests {
let env = TestingEnv::new();
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}];
let timeout = Duration::from_secs(10);
let err = EnterStagingRegion::enter_staging_region(
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
&enter_staging_regions,
timeout,
)
.await
@@ -447,11 +434,10 @@ mod tests {
.await;
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}];
let timeout = Duration::from_secs(10);
// Sends a timeout error.
@@ -463,7 +449,7 @@ mod tests {
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
&enter_staging_regions,
timeout,
)
.await
@@ -479,11 +465,10 @@ mod tests {
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}];
let timeout = Duration::from_secs(10);
env.mailbox_ctx
@@ -498,7 +483,7 @@ mod tests {
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
&enter_staging_regions,
timeout,
)
.await
@@ -516,11 +501,10 @@ mod tests {
.await;
let server_addr = "localhost";
let peer = Peer::empty(1);
let instruction =
Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}]);
let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(),
}];
let timeout = Duration::from_secs(10);
// Sends a failed reply.
@@ -538,7 +522,7 @@ mod tests {
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
&enter_staging_regions,
timeout,
)
.await
@@ -565,7 +549,7 @@ mod tests {
env.mailbox_ctx.mailbox(),
server_addr,
&peer,
&instruction,
&enter_staging_regions,
timeout,
)
.await
@@ -596,7 +580,7 @@ mod tests {
},
],
central_region: RegionId::new(table_id, 1),
central_region_datanode_id: 1,
central_region_datanode: Peer::empty(1),
}
}

View File

@@ -0,0 +1,222 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply};
use common_meta::peer::Peer;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::RegionId;
use crate::error::{self, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest;
use crate::procedure::repartition::group::{Context, State};
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::service::mailbox::{Channel, MailboxRef};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct RemapManifest;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RemapManifest {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
let remap = Self::build_remap_manifest_instructions(
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
&ctx.persistent_ctx.region_mapping,
prepare_result.central_region,
)?;
let operation_timeout =
ctx.next_operation_timeout()
.context(error::ExceededDeadlineSnafu {
operation: "Remap manifests",
})?;
let manifest_paths = Self::remap_manifests(
&ctx.mailbox,
&ctx.server_addr,
&prepare_result.central_region_datanode,
&remap,
operation_timeout,
)
.await?;
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
if manifest_paths.len() != ctx.persistent_ctx.targets.len() {
warn!(
"Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.",
ctx.persistent_ctx.targets.len(),
manifest_paths.len(),
group_id,
table_id
);
}
ctx.persistent_ctx.staging_manifest_paths = manifest_paths;
Ok((Box::new(ApplyStagingManifest), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl RemapManifest {
fn build_remap_manifest_instructions(
source_regions: &[RegionDescriptor],
target_regions: &[RegionDescriptor],
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
central_region_id: RegionId,
) -> Result<common_meta::instruction::RemapManifest> {
let new_partition_exprs = target_regions
.iter()
.map(|r| {
Ok((
r.region_id,
r.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?,
))
})
.collect::<Result<HashMap<RegionId, String>>>()?;
Ok(common_meta::instruction::RemapManifest {
region_id: central_region_id,
input_regions: source_regions.iter().map(|r| r.region_id).collect(),
region_mapping: region_mapping.clone(),
new_partition_exprs,
})
}
async fn remap_manifests(
mailbox: &MailboxRef,
server_addr: &str,
peer: &Peer,
remap: &common_meta::instruction::RemapManifest,
timeout: Duration,
) -> Result<HashMap<RegionId, String>> {
let ch = Channel::Datanode(peer.id);
let instruction = Instruction::RemapManifest(remap.clone());
let message = MailboxMessage::json_message(
&format!(
"Remap manifests, central region: {}, input regions: {:?}",
remap.region_id, remap.input_regions
),
&format!("Metasrv@{}", server_addr),
&format!("Datanode-{}@{}", peer.id, peer.addr),
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
let now = Instant::now();
let receiver = mailbox.send(&ch, message, timeout).await;
let receiver = match receiver {
Ok(receiver) => receiver,
Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
reason: format!(
"Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
),
}
.fail()?,
Err(err) => {
return Err(err);
}
};
match receiver.await {
Ok(msg) => {
let reply = HeartbeatMailbox::json_reply(&msg)?;
info!(
"Received remap manifest reply: {:?}, elapsed: {:?}",
reply,
now.elapsed()
);
let InstructionReply::RemapManifest(reply) = reply else {
return error::UnexpectedInstructionReplySnafu {
mailbox_message: msg.to_string(),
reason: "expect remap manifest reply",
}
.fail();
};
Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer)
}
Err(error::Error::MailboxTimeout { .. }) => {
let reason = format!(
"Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}",
peer,
now.elapsed()
);
error::RetryLaterSnafu { reason }.fail()
}
Err(err) => Err(err),
}
}
fn handle_remap_manifest_reply(
region_id: RegionId,
RemapManifestReply {
exists,
manifest_paths,
error,
}: RemapManifestReply,
now: &Instant,
peer: &Peer,
) -> Result<HashMap<RegionId, String>> {
ensure!(
exists,
error::UnexpectedSnafu {
violated: format!(
"Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
region_id,
peer,
now.elapsed()
)
}
);
if error.is_some() {
return error::RetryLaterSnafu {
reason: format!(
"Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}",
peer,
error,
now.elapsed()
),
}
.fail();
}
Ok(manifest_paths)
}
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RepartitionEnd {
async fn next(
&mut self,
_ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RepartitionEnd), Status::done()))
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::{
Context, GroupId, GroupPrepareResult, State, region_routes,
};
@@ -109,7 +110,7 @@ impl RepartitionStart {
);
}
let central_region = sources[0].region_id;
let central_region_datanode_id = source_region_routes[0]
let central_region_datanode = source_region_routes[0]
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
@@ -118,20 +119,22 @@ impl RepartitionStart {
central_region
),
})?
.id;
.clone();
Ok(GroupPrepareResult {
source_routes: source_region_routes,
target_routes: target_region_routes,
central_region,
central_region_datanode_id,
central_region_datanode,
})
}
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
(
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
)
}
}

View File

@@ -17,12 +17,14 @@ pub(crate) mod rollback_staging_region;
use std::any::Any;
use common_meta::lock_key::TableLock;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion;
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::{Context, State};
#[derive(Debug, Serialize, Deserialize)]
@@ -33,22 +35,16 @@ pub enum UpdateMetadata {
RollbackStaging,
}
impl UpdateMetadata {
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateMetadata {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
match self {
UpdateMetadata::ApplyStaging => {
// TODO(weny): If all metadata have already been updated, skip applying staging regions.
@@ -59,7 +55,7 @@ impl State for UpdateMetadata {
"Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
Ok((Box::new(EnterStagingRegion), Status::executing(false)))
}
UpdateMetadata::RollbackStaging => {
self.rollback_staging_regions(ctx).await?;
@@ -69,7 +65,7 @@ impl State for UpdateMetadata {
"Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}"
);
};
Ok(Self::next_state())
Ok((Box::new(RepartitionEnd), Status::executing(false)))
}
}
}

View File

@@ -41,6 +41,9 @@ pub struct AllocationPlanEntry {
pub regions_to_allocate: usize,
/// The number of regions that need to be deallocated (source count - target count, if positive).
pub regions_to_deallocate: usize,
/// For each `source_regions[k]`, the corresponding vector contains global
/// `target_partition_exprs` that overlap with it.
pub transition_map: Vec<Vec<usize>>,
}
/// A plan entry for the dispatch phase after region allocation,
@@ -57,6 +60,9 @@ pub struct RepartitionPlanEntry {
pub allocated_region_ids: Vec<RegionId>,
/// The region ids of the regions that are pending deallocation.
pub pending_deallocate_region_ids: Vec<RegionId>,
/// For each `source_regions[k]`, the corresponding vector contains global
/// `target_regions` that overlap with it.
pub transition_map: Vec<Vec<usize>>,
}
impl RepartitionPlanEntry {
@@ -71,6 +77,7 @@ impl RepartitionPlanEntry {
target_partition_exprs,
regions_to_allocate,
regions_to_deallocate,
transition_map,
}: &AllocationPlanEntry,
) -> Self {
debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0);
@@ -89,6 +96,7 @@ impl RepartitionPlanEntry {
target_regions,
allocated_region_ids: vec![],
pending_deallocate_region_ids: vec![],
transition_map: transition_map.clone(),
}
}
}

View File

@@ -129,6 +129,7 @@ impl RepartitionStart {
target_partition_exprs,
regions_to_allocate,
regions_to_deallocate,
transition_map: subtask.transition_map,
}
})
.collect::<Vec<_>>()

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
@@ -87,9 +88,13 @@ pub fn new_persistent_context(
) -> PersistentContext {
PersistentContext {
group_id: Uuid::new_v4(),
catalog_name: "test_catalog".to_string(),
schema_name: "test_schema".to_string(),
table_id,
sources,
targets,
region_mapping: HashMap::new(),
group_prepare_result: None,
staging_manifest_paths: HashMap::new(),
}
}