diff --git a/Cargo.lock b/Cargo.lock index 02f99d7290..e5f959cb50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4062,6 +4062,7 @@ dependencies = [ "mito2", "num_cpus", "object-store", + "partition", "prometheus", "prost 0.13.5", "query", diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 44a6d920f9..d581e3a295 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -530,6 +530,49 @@ impl Display for EnterStagingRegion { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RemapManifest { + pub region_id: RegionId, + /// Regions to remap manifests from. + pub input_regions: Vec, + /// For each old region, which new regions should receive its files + pub region_mapping: HashMap>, + /// New partition expressions for the new regions. + pub new_partition_exprs: HashMap, +} + +impl Display for RemapManifest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RemapManifest(region_id={}, input_regions={:?}, region_mapping={:?}, new_partition_exprs={:?})", + self.region_id, self.input_regions, self.region_mapping, self.new_partition_exprs + ) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ApplyStagingManifest { + /// The region ID to apply the staging manifest to. + pub region_id: RegionId, + /// The partition expression of the staging region. + pub partition_expr: String, + /// The region that stores the staging manifests in its staging blob storage. + pub central_region_id: RegionId, + /// The relative path to the staging manifest within the central region's staging blob storage. + pub manifest_path: String, +} + +impl Display for ApplyStagingManifest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ApplyStagingManifest(region_id={}, partition_expr={}, central_region_id={}, manifest_path={})", + self.region_id, self.partition_expr, self.central_region_id, self.manifest_path + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens regions. @@ -559,6 +602,10 @@ pub enum Instruction { Suspend, /// Makes regions enter staging state. EnterStagingRegions(Vec), + /// Remaps manifests for a region. + RemapManifest(RemapManifest), + /// Applies staging manifests for a region. + ApplyStagingManifests(Vec), } impl Instruction { @@ -737,6 +784,48 @@ impl EnterStagingRegionsReply { } } +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct RemapManifestReply { + /// Returns false if the region does not exist. + pub exists: bool, + /// A map from region IDs to their corresponding remapped manifest paths. + pub manifest_paths: HashMap, + /// Return error if any during the operation. + pub error: Option, +} + +impl Display for RemapManifestReply { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RemapManifestReply(manifest_paths={:?}, error={:?})", + self.manifest_paths, self.error + ) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct ApplyStagingManifestsReply { + pub replies: Vec, +} + +impl ApplyStagingManifestsReply { + pub fn new(replies: Vec) -> Self { + Self { replies } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct ApplyStagingManifestReply { + pub region_id: RegionId, + /// Returns true if the region is ready to serve reads and writes. + pub ready: bool, + /// Indicates whether the region exists. + pub exists: bool, + /// Return error if any during the operation. + pub error: Option, +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum InstructionReply { @@ -758,6 +847,8 @@ pub enum InstructionReply { GetFileRefs(GetFileRefsReply), GcRegions(GcRegionsReply), EnterStagingRegions(EnterStagingRegionsReply), + RemapManifest(RemapManifestReply), + ApplyStagingManifests(ApplyStagingManifestsReply), } impl Display for InstructionReply { @@ -781,6 +872,12 @@ impl Display for InstructionReply { reply.replies ) } + Self::RemapManifest(reply) => write!(f, "InstructionReply::RemapManifest({})", reply), + Self::ApplyStagingManifests(reply) => write!( + f, + "InstructionReply::ApplyStagingManifests({:?})", + reply.replies + ), } } } @@ -828,6 +925,20 @@ impl InstructionReply { _ => panic!("Expected EnterStagingRegion reply"), } } + + pub fn expect_remap_manifest_reply(self) -> RemapManifestReply { + match self { + Self::RemapManifest(reply) => reply, + _ => panic!("Expected RemapManifest reply"), + } + } + + pub fn expect_apply_staging_manifests_reply(self) -> Vec { + match self { + Self::ApplyStagingManifests(reply) => reply.replies, + _ => panic!("Expected ApplyStagingManifest reply"), + } + } } #[cfg(test)] diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 265ede339e..8747c58577 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -77,4 +77,5 @@ common-query.workspace = true common-test-util.workspace = true datafusion-common.workspace = true mito2 = { workspace = true, features = ["test"] } +partition.workspace = true session.workspace = true diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a8fe3fd969..21eda4c71a 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -201,6 +201,7 @@ pub enum Error { ShutdownServer { #[snafu(implicit)] location: Location, + #[snafu(source)] source: servers::error::Error, }, @@ -208,6 +209,7 @@ pub enum Error { ShutdownInstance { #[snafu(implicit)] location: Location, + #[snafu(source)] source: BoxedError, }, diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index a39b3787ea..8ad8f9da1b 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -22,6 +22,7 @@ use common_telemetry::error; use snafu::OptionExt; use store_api::storage::GcReport; +mod apply_staging_manifest; mod close_region; mod downgrade_region; mod enter_staging; @@ -29,8 +30,10 @@ mod file_ref; mod flush_region; mod gc_worker; mod open_region; +mod remap_manifest; mod upgrade_region; +use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler; use crate::heartbeat::handler::close_region::CloseRegionsHandler; use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler; use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler; @@ -38,6 +41,7 @@ use crate::heartbeat::handler::file_ref::GetFileRefsHandler; use crate::heartbeat::handler::flush_region::FlushRegionsHandler; use crate::heartbeat::handler::gc_worker::GcRegionsHandler; use crate::heartbeat::handler::open_region::OpenRegionsHandler; +use crate::heartbeat::handler::remap_manifest::RemapManifestHandler; use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler; use crate::heartbeat::task_tracker::TaskTracker; use crate::region_server::RegionServer; @@ -128,6 +132,10 @@ impl RegionHeartbeatResponseHandler { Instruction::EnterStagingRegions(_) => { Ok(Some(Box::new(EnterStagingRegionsHandler.into()))) } + Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))), + Instruction::ApplyStagingManifests(_) => { + Ok(Some(Box::new(ApplyStagingManifestsHandler.into()))) + } } } } @@ -142,6 +150,8 @@ pub enum InstructionHandlers { GetFileRefs(GetFileRefsHandler), GcRegions(GcRegionsHandler), EnterStagingRegions(EnterStagingRegionsHandler), + RemapManifest(RemapManifestHandler), + ApplyStagingManifests(ApplyStagingManifestsHandler), } macro_rules! impl_from_handler { @@ -164,7 +174,9 @@ impl_from_handler!( UpgradeRegionsHandler => UpgradeRegions, GetFileRefsHandler => GetFileRefs, GcRegionsHandler => GcRegions, - EnterStagingRegionsHandler => EnterStagingRegions + EnterStagingRegionsHandler => EnterStagingRegions, + RemapManifestHandler => RemapManifest, + ApplyStagingManifestsHandler => ApplyStagingManifests ); macro_rules! dispatch_instr { @@ -209,7 +221,9 @@ dispatch_instr!( UpgradeRegions => UpgradeRegions, GetFileRefs => GetFileRefs, GcRegions => GcRegions, - EnterStagingRegions => EnterStagingRegions + EnterStagingRegions => EnterStagingRegions, + RemapManifest => RemapManifest, + ApplyStagingManifests => ApplyStagingManifests, ); #[async_trait] diff --git a/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs new file mode 100644 index 0000000000..e3ee481f98 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/apply_staging_manifest.rs @@ -0,0 +1,287 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{ + ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply, +}; +use common_telemetry::{error, warn}; +use futures::future::join_all; +use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest}; + +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +pub struct ApplyStagingManifestsHandler; + +#[async_trait::async_trait] +impl InstructionHandler for ApplyStagingManifestsHandler { + type Instruction = Vec; + async fn handle( + &self, + ctx: &HandlerContext, + requests: Self::Instruction, + ) -> Option { + let results = join_all( + requests + .into_iter() + .map(|request| Self::handle_apply_staging_manifest(ctx, request)), + ) + .await; + Some(InstructionReply::ApplyStagingManifests( + ApplyStagingManifestsReply::new(results), + )) + } +} + +impl ApplyStagingManifestsHandler { + async fn handle_apply_staging_manifest( + ctx: &HandlerContext, + request: ApplyStagingManifest, + ) -> ApplyStagingManifestReply { + let Some(leader) = ctx.region_server.is_region_leader(request.region_id) else { + warn!("Region: {} is not found", request.region_id); + return ApplyStagingManifestReply { + region_id: request.region_id, + exists: false, + ready: false, + error: None, + }; + }; + if !leader { + warn!("Region: {} is not leader", request.region_id); + return ApplyStagingManifestReply { + region_id: request.region_id, + exists: true, + ready: false, + error: Some("Region is not leader".into()), + }; + } + + match ctx + .region_server + .handle_request( + request.region_id, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: request.partition_expr, + central_region_id: request.central_region_id, + manifest_path: request.manifest_path, + }), + ) + .await + { + Ok(_) => ApplyStagingManifestReply { + region_id: request.region_id, + exists: true, + ready: true, + error: None, + }, + Err(err) => { + error!(err; "Failed to apply staging manifest"); + ApplyStagingManifestReply { + region_id: request.region_id, + exists: true, + ready: false, + error: Some(format!("{err:?}")), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::instruction::RemapManifest; + use datatypes::value::Value; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use partition::expr::{PartitionExpr, col}; + use store_api::path_utils::table_dir; + use store_api::region_engine::RegionRole; + use store_api::region_request::EnterStagingRequest; + use store_api::storage::RegionId; + + use super::*; + use crate::heartbeat::handler::remap_manifest::RemapManifestHandler; + use crate::region_server::RegionServer; + use crate::tests::{MockRegionEngine, mock_region_server}; + + #[tokio::test] + async fn test_region_not_exist() { + let mut mock_region_server = mock_region_server(); + let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); + mock_region_server.register_engine(mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let region_id = RegionId::new(1024, 1); + let reply = ApplyStagingManifestsHandler + .handle( + &handler_context, + vec![ApplyStagingManifest { + region_id, + partition_expr: "".to_string(), + central_region_id: RegionId::new(1024, 9999), // use a dummy value + manifest_path: "".to_string(), + }], + ) + .await + .unwrap(); + let replies = reply.expect_apply_staging_manifests_reply(); + let reply = &replies[0]; + assert!(!reply.exists); + assert!(!reply.ready); + assert!(reply.error.is_none()); + } + + #[tokio::test] + async fn test_region_not_leader() { + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { + region_engine.mock_role = Some(Some(RegionRole::Follower)); + region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); + }); + mock_region_server.register_test_region(region_id, mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let region_id = RegionId::new(1024, 1); + let reply = ApplyStagingManifestsHandler + .handle( + &handler_context, + vec![ApplyStagingManifest { + region_id, + partition_expr: "".to_string(), + central_region_id: RegionId::new(1024, 2), + manifest_path: "".to_string(), + }], + ) + .await + .unwrap(); + let replies = reply.expect_apply_staging_manifests_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(!reply.ready); + assert!(reply.error.is_some()); + } + + fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Int64(start)) + .and(col(col_name).lt(Value::Int64(end))) + } + + async fn prepare_region(region_server: &RegionServer) { + let region_specs = [ + (RegionId::new(1024, 1), range_expr("x", 0, 49)), + (RegionId::new(1024, 2), range_expr("x", 49, 100)), + ]; + + for (region_id, partition_expr) in region_specs { + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + create_req.table_dir = table_dir("test", 1024); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + region_server + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: partition_expr.as_json_str().unwrap(), + }), + ) + .await + .unwrap(); + } + } + + #[tokio::test] + async fn test_apply_staging_manifest() { + common_telemetry::init_default_ut_logging(); + let mut region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let mut engine_env = TestEnv::new().await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + prepare_region(®ion_server).await; + + let handler_context = HandlerContext::new_for_test(region_server); + let region_id2 = RegionId::new(1024, 2); + let reply = RemapManifestHandler + .handle( + &handler_context, + RemapManifest { + region_id, + input_regions: vec![region_id, region_id2], + region_mapping: HashMap::from([ + // [0,49) <- [0, 50) + (region_id, vec![region_id]), + // [49, 100) <- [0, 50), [50,100) + (region_id2, vec![region_id, region_id2]), + ]), + new_partition_exprs: HashMap::from([ + (region_id, range_expr("x", 0, 49).as_json_str().unwrap()), + (region_id2, range_expr("x", 49, 100).as_json_str().unwrap()), + ]), + }, + ) + .await + .unwrap(); + let reply = reply.expect_remap_manifest_reply(); + assert!(reply.exists); + assert!(reply.error.is_none(), "{}", reply.error.unwrap()); + assert_eq!(reply.manifest_paths.len(), 2); + let manifest_path_1 = reply.manifest_paths[®ion_id].clone(); + let manifest_path_2 = reply.manifest_paths[®ion_id2].clone(); + + let reply = ApplyStagingManifestsHandler + .handle( + &handler_context, + vec![ApplyStagingManifest { + region_id, + partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(), + central_region_id: region_id, + manifest_path: manifest_path_1, + }], + ) + .await + .unwrap(); + let replies = reply.expect_apply_staging_manifests_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(reply.ready); + assert!(reply.error.is_none()); + + // partition expr mismatch + let reply = ApplyStagingManifestsHandler + .handle( + &handler_context, + vec![ApplyStagingManifest { + region_id: region_id2, + partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(), + central_region_id: region_id, + manifest_path: manifest_path_2, + }], + ) + .await + .unwrap(); + let replies = reply.expect_apply_staging_manifests_reply(); + let reply = &replies[0]; + assert!(reply.exists); + assert!(!reply.ready); + assert!(reply.error.is_some()); + } +} diff --git a/src/datanode/src/heartbeat/handler/remap_manifest.rs b/src/datanode/src/heartbeat/handler/remap_manifest.rs new file mode 100644 index 0000000000..3f57545659 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/remap_manifest.rs @@ -0,0 +1,246 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply}; +use common_telemetry::warn; +use store_api::region_engine::RemapManifestsRequest; + +use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + +pub struct RemapManifestHandler; + +#[async_trait::async_trait] +impl InstructionHandler for RemapManifestHandler { + type Instruction = RemapManifest; + async fn handle( + &self, + ctx: &HandlerContext, + request: Self::Instruction, + ) -> Option { + let RemapManifest { + region_id, + input_regions, + region_mapping, + new_partition_exprs, + } = request; + let Some(leader) = ctx.region_server.is_region_leader(region_id) else { + warn!("Region: {} is not found", region_id); + return Some(InstructionReply::RemapManifest(RemapManifestReply { + exists: false, + manifest_paths: Default::default(), + error: None, + })); + }; + + if !leader { + warn!("Region: {} is not leader", region_id); + return Some(InstructionReply::RemapManifest(RemapManifestReply { + exists: true, + manifest_paths: Default::default(), + error: Some("Region is not leader".into()), + })); + } + + let reply = match ctx + .region_server + .remap_manifests(RemapManifestsRequest { + region_id, + input_regions, + region_mapping, + new_partition_exprs, + }) + .await + { + Ok(result) => InstructionReply::RemapManifest(RemapManifestReply { + exists: true, + manifest_paths: result.manifest_paths, + error: None, + }), + Err(e) => InstructionReply::RemapManifest(RemapManifestReply { + exists: true, + manifest_paths: Default::default(), + error: Some(format!("{e:?}")), + }), + }; + + Some(reply) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use common_meta::instruction::RemapManifest; + use datatypes::value::Value; + use mito2::config::MitoConfig; + use mito2::engine::MITO_ENGINE_NAME; + use mito2::test_util::{CreateRequestBuilder, TestEnv}; + use partition::expr::{PartitionExpr, col}; + use store_api::path_utils::table_dir; + use store_api::region_engine::RegionRole; + use store_api::region_request::{EnterStagingRequest, RegionRequest}; + use store_api::storage::RegionId; + + use crate::heartbeat::handler::remap_manifest::RemapManifestHandler; + use crate::heartbeat::handler::{HandlerContext, InstructionHandler}; + use crate::region_server::RegionServer; + use crate::tests::{MockRegionEngine, mock_region_server}; + + #[tokio::test] + async fn test_region_not_exist() { + let mut mock_region_server = mock_region_server(); + let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME); + mock_region_server.register_engine(mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let region_id = RegionId::new(1024, 1); + let reply = RemapManifestHandler + .handle( + &handler_context, + RemapManifest { + region_id, + input_regions: vec![], + region_mapping: HashMap::new(), + new_partition_exprs: HashMap::new(), + }, + ) + .await + .unwrap(); + let reply = &reply.expect_remap_manifest_reply(); + assert!(!reply.exists); + assert!(reply.error.is_none()); + assert!(reply.manifest_paths.is_empty()); + } + + #[tokio::test] + async fn test_region_not_leader() { + let mock_region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let (mock_engine, _) = + MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| { + region_engine.mock_role = Some(Some(RegionRole::Follower)); + region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0))); + }); + mock_region_server.register_test_region(region_id, mock_engine); + let handler_context = HandlerContext::new_for_test(mock_region_server); + let reply = RemapManifestHandler + .handle( + &handler_context, + RemapManifest { + region_id, + input_regions: vec![], + region_mapping: HashMap::new(), + new_partition_exprs: HashMap::new(), + }, + ) + .await + .unwrap(); + let reply = reply.expect_remap_manifest_reply(); + assert!(reply.exists); + assert!(reply.error.is_some()); + } + + fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Int64(start)) + .and(col(col_name).lt(Value::Int64(end))) + } + + async fn prepare_region(region_server: &RegionServer) { + let region_specs = [ + (RegionId::new(1024, 1), range_expr("x", 0, 50)), + (RegionId::new(1024, 2), range_expr("x", 50, 100)), + ]; + + for (region_id, partition_expr) in region_specs { + let builder = CreateRequestBuilder::new(); + let mut create_req = builder.build(); + create_req.table_dir = table_dir("test", 1024); + region_server + .handle_request(region_id, RegionRequest::Create(create_req)) + .await + .unwrap(); + region_server + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: partition_expr.as_json_str().unwrap(), + }), + ) + .await + .unwrap(); + } + } + + #[tokio::test] + async fn test_remap_manifest() { + common_telemetry::init_default_ut_logging(); + let mut region_server = mock_region_server(); + let region_id = RegionId::new(1024, 1); + let mut engine_env = TestEnv::new().await; + let engine = engine_env.create_engine(MitoConfig::default()).await; + region_server.register_engine(Arc::new(engine.clone())); + prepare_region(®ion_server).await; + + let handler_context = HandlerContext::new_for_test(region_server); + let region_id2 = RegionId::new(1024, 2); + let reply = RemapManifestHandler + .handle( + &handler_context, + RemapManifest { + region_id, + input_regions: vec![region_id, region_id2], + region_mapping: HashMap::from([ + (region_id, vec![region_id]), + (region_id2, vec![region_id]), + ]), + new_partition_exprs: HashMap::from([( + region_id, + range_expr("x", 0, 100).as_json_str().unwrap(), + )]), + }, + ) + .await + .unwrap(); + let reply = reply.expect_remap_manifest_reply(); + assert!(reply.exists); + assert!(reply.error.is_none(), "{}", reply.error.unwrap()); + assert_eq!(reply.manifest_paths.len(), 1); + + // Remap failed + let reply = RemapManifestHandler + .handle( + &handler_context, + RemapManifest { + region_id, + input_regions: vec![region_id], + region_mapping: HashMap::from([ + (region_id, vec![region_id]), + (region_id2, vec![region_id]), + ]), + new_partition_exprs: HashMap::from([( + region_id, + range_expr("x", 0, 100).as_json_str().unwrap(), + )]), + }, + ) + .await + .unwrap(); + let reply = reply.expect_remap_manifest_reply(); + assert!(reply.exists); + assert!(reply.error.is_some()); + assert!(reply.manifest_paths.is_empty()); + } +} diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 0d710cc0bf..e35af7536b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -65,8 +65,9 @@ use store_api::metric_engine_consts::{ FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, }; use store_api::region_engine::{ - RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, SetRegionRoleStateResponse, - SettableRegionRoleState, SyncRegionFromRequest, + RegionEngineRef, RegionManifestInfo, RegionRole, RegionStatistic, RemapManifestsRequest, + RemapManifestsResponse, SetRegionRoleStateResponse, SettableRegionRoleState, + SyncRegionFromRequest, }; use store_api::region_request::{ AffectedRows, BatchRegionDdlRequest, RegionCatchupRequest, RegionCloseRequest, @@ -604,6 +605,25 @@ impl RegionServer { .await } + /// Remaps manifests from old regions to new regions. + pub async fn remap_manifests( + &self, + request: RemapManifestsRequest, + ) -> Result { + let region_id = request.region_id; + let engine_with_status = self + .inner + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })?; + + engine_with_status + .engine() + .remap_manifests(request) + .await + .with_context(|_| HandleRegionRequestSnafu { region_id }) + } + fn is_suspended(&self) -> bool { self.suspend.load(Ordering::Relaxed) } diff --git a/src/meta-srv/src/procedure/repartition/dispatch.rs b/src/meta-srv/src/procedure/repartition/dispatch.rs index cb5881ab68..d8d7b2fc7b 100644 --- a/src/meta-srv/src/procedure/repartition/dispatch.rs +++ b/src/meta-srv/src/procedure/repartition/dispatch.rs @@ -13,18 +13,41 @@ // limitations under the License. use std::any::Any; +use std::collections::HashMap; use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; use crate::error::Result; use crate::procedure::repartition::collect::{Collect, ProcedureMeta}; use crate::procedure::repartition::group::RepartitionGroupProcedure; +use crate::procedure::repartition::plan::RegionDescriptor; use crate::procedure::repartition::{self, Context, State}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Dispatch; +#[allow(dead_code)] +fn build_region_mapping( + source_regions: &[RegionDescriptor], + target_regions: &[RegionDescriptor], + transition_map: &[Vec], +) -> HashMap> { + transition_map + .iter() + .enumerate() + .map(|(source_idx, indices)| { + let source_region = source_regions[source_idx].region_id; + let target_regions = indices + .iter() + .map(|&target_idx| target_regions[target_idx].region_id) + .collect::>(); + (source_region, target_regions) + }) + .collect::>() +} + #[async_trait::async_trait] #[typetag::serde] impl State for Dispatch { @@ -37,11 +60,19 @@ impl State for Dispatch { let mut procedures = Vec::with_capacity(ctx.persistent_ctx.plans.len()); let mut procedure_metas = Vec::with_capacity(ctx.persistent_ctx.plans.len()); for (plan_index, plan) in ctx.persistent_ctx.plans.iter().enumerate() { + let region_mapping = build_region_mapping( + &plan.source_regions, + &plan.target_regions, + &plan.transition_map, + ); let persistent_ctx = repartition::group::PersistentContext::new( plan.group_id, table_id, + ctx.persistent_ctx.catalog_name.clone(), + ctx.persistent_ctx.schema_name.clone(), plan.source_regions.clone(), plan.target_regions.clone(), + region_mapping, ); let group_procedure = RepartitionGroupProcedure::new(persistent_ctx, ctx); diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index 8648e0fef5..c4b72d02ea 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -12,27 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod apply_staging_manifest; pub(crate) mod enter_staging_region; +pub(crate) mod remap_manifest; +pub(crate) mod repartition_end; pub(crate) mod repartition_start; pub(crate) mod update_metadata; pub(crate) mod utils; use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::time::Duration; use common_error::ext::BoxedError; -use common_meta::DatanodeId; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue, RegionInfo}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; +use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; +use common_procedure::error::ToJsonSnafu; use common_procedure::{ - Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, - UserMetadata, + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, StringKey, UserMetadata, }; +use common_telemetry::error; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; @@ -71,6 +78,12 @@ impl RepartitionGroupProcedure { } } +#[derive(Debug, Serialize)] +pub struct RepartitionGroupData<'a> { + persistent_ctx: &'a PersistentContext, + state: &'a dyn State, +} + #[async_trait::async_trait] impl Procedure for RepartitionGroupProcedure { fn type_name(&self) -> &str { @@ -78,27 +91,48 @@ impl Procedure for RepartitionGroupProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - todo!() - } + let state = &mut self.state; - async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> { - todo!() + match state.next(&mut self.context, _ctx).await { + Ok((next, status)) => { + *state = next; + Ok(status) + } + Err(e) => { + if e.is_retryable() { + Err(ProcedureError::retry_later(e)) + } else { + error!( + e; + "Repartition group procedure failed, group id: {}, table id: {}", + self.context.persistent_ctx.group_id, + self.context.persistent_ctx.table_id, + ); + Err(ProcedureError::external(e)) + } + } + } } fn rollback_supported(&self) -> bool { - true + false } fn dump(&self) -> ProcedureResult { - todo!() + let data = RepartitionGroupData { + persistent_ctx: &self.context.persistent_ctx, + state: self.state.as_ref(), + }; + serde_json::to_string(&data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { - todo!() + LockKey::new(self.context.persistent_ctx.lock_key()) } fn user_metadata(&self) -> Option { - todo!() + // TODO(weny): support user metadata. + None } } @@ -123,8 +157,8 @@ pub struct GroupPrepareResult { pub target_routes: Vec, /// The primary source region id (first source region), used for retrieving region options. pub central_region: RegionId, - /// The datanode id where the primary source region is located. - pub central_region_datanode_id: DatanodeId, + /// The peer where the primary source region is located. + pub central_region_datanode: Peer, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -132,30 +166,59 @@ pub struct PersistentContext { pub group_id: GroupId, /// The table id of the repartition group. pub table_id: TableId, + /// The catalog name of the repartition group. + pub catalog_name: String, + /// The schema name of the repartition group. + pub schema_name: String, /// The source regions of the repartition group. pub sources: Vec, /// The target regions of the repartition group. pub targets: Vec, + /// For each `source region`, the corresponding + /// `target regions` that overlap with it. + pub region_mapping: HashMap>, /// The result of group prepare. /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state. pub group_prepare_result: Option, + /// The staging manifest paths of the repartition group. + /// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state. + pub staging_manifest_paths: HashMap, } impl PersistentContext { pub fn new( group_id: GroupId, table_id: TableId, + catalog_name: String, + schema_name: String, sources: Vec, targets: Vec, + region_mapping: HashMap>, ) -> Self { Self { group_id, table_id, + catalog_name, + schema_name, sources, targets, + region_mapping, group_prepare_result: None, + staging_manifest_paths: HashMap::new(), } } + + pub fn lock_key(&self) -> Vec { + let mut lock_keys = Vec::with_capacity(2 + self.sources.len()); + lock_keys.extend([ + CatalogLock::Read(&self.catalog_name).into(), + SchemaLock::read(&self.catalog_name, &self.schema_name).into(), + ]); + for source in &self.sources { + lock_keys.push(RegionLock::Write(source.region_id).into()); + } + lock_keys + } } impl Context { @@ -253,7 +316,7 @@ impl Context { // Safety: prepare result is set in [RepartitionStart] state. let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap(); let central_region_datanode_table_value = self - .get_datanode_table_value(table_id, prepare_result.central_region_datanode_id) + .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id) .await?; let RegionInfo { region_options, diff --git a/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs new file mode 100644 index 0000000000..0223d25794 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/apply_staging_manifest.rs @@ -0,0 +1,333 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{ + ApplyStagingManifestReply, ApplyStagingManifestsReply, Instruction, InstructionReply, +}; +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::info; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::RegionId; + +use crate::error::{self, Error, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; +use crate::procedure::repartition::group::utils::{ + HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, +}; +use crate::procedure::repartition::group::{Context, State}; +use crate::procedure::repartition::plan::RegionDescriptor; +use crate::service::mailbox::{Channel, MailboxRef}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ApplyStagingManifest; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for ApplyStagingManifest { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + self.apply_staging_manifests(ctx).await?; + + Ok(( + Box::new(UpdateMetadata::ApplyStaging), + Status::executing(true), + )) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ApplyStagingManifest { + fn build_apply_staging_manifest_instructions( + staging_manifest_paths: &HashMap, + target_routes: &[RegionRoute], + targets: &[RegionDescriptor], + central_region_id: RegionId, + ) -> Result>> { + let target_partition_expr_by_region = targets + .iter() + .map(|target| { + Ok(( + target.region_id, + target + .partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?, + )) + }) + .collect::>>()?; + // Safety: `leader_peer` is set for all region routes, checked in `repartition_start`. + let target_region_routes_by_peer = group_region_routes_by_peer(target_routes); + let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len()); + + for (peer, region_ids) in target_region_routes_by_peer { + let apply_staging_manifests = region_ids + .into_iter() + .map(|region_id| common_meta::instruction::ApplyStagingManifest { + region_id, + partition_expr: target_partition_expr_by_region[®ion_id].clone(), + central_region_id, + manifest_path: staging_manifest_paths[®ion_id].clone(), + }) + .collect(); + instructions.insert(peer.clone(), apply_staging_manifests); + } + + Ok(instructions) + } + + #[allow(dead_code)] + async fn apply_staging_manifests(&self, ctx: &mut Context) -> Result<()> { + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + let staging_manifest_paths = &ctx.persistent_ctx.staging_manifest_paths; + // Safety: the group prepare result is set in the RepartitionStart state. + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let targets = &ctx.persistent_ctx.targets; + let target_routes = &prepare_result.target_routes; + let central_region_id = prepare_result.central_region; + let instructions = Self::build_apply_staging_manifest_instructions( + staging_manifest_paths, + target_routes, + targets, + central_region_id, + )?; + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Apply staging manifests", + })?; + + let (peers, tasks): (Vec<_>, Vec<_>) = instructions + .iter() + .map(|(peer, apply_staging_manifests)| { + ( + peer, + Self::apply_staging_manifest( + &ctx.mailbox, + &ctx.server_addr, + peer, + apply_staging_manifests, + operation_timeout, + ), + ) + }) + .unzip(); + info!( + "Sent apply staging manifests instructions to peers: {:?} for repartition table {}, group id {}", + peers, table_id, group_id + ); + + let format_err_msg = |idx: usize, error: &Error| { + let peer = peers[idx]; + format!( + "Failed to apply staging manifests on datanode {:?}, error: {:?}", + peer, error + ) + }; + // Waits for all tasks to complete. + let results = join_all(tasks).await; + let result = handle_multiple_results(&results); + match result { + HandleMultipleResult::AllSuccessful => Ok(()), + HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu { + reason: format!( + "All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}", + table_id, group_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu { + violated: format!( + "All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}", + table_id, group_id, + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(",") + ), + } + .fail(), + HandleMultipleResult::PartialRetryable { + retryable_errors, + non_retryable_errors, + } => error::UnexpectedSnafu { + violated: format!( + "Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}", + table_id, group_id, + retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + non_retryable_errors + .iter() + .map(|(idx, error)| format_err_msg(*idx, error)) + .collect::>() + .join(","), + ), + } + .fail(), + } + } + + async fn apply_staging_manifest( + mailbox: &MailboxRef, + server_addr: &str, + peer: &Peer, + apply_staging_manifests: &[common_meta::instruction::ApplyStagingManifest], + timeout: Duration, + ) -> Result<()> { + let ch = Channel::Datanode(peer.id); + let instruction = Instruction::ApplyStagingManifests(apply_staging_manifests.to_vec()); + let message = MailboxMessage::json_message( + &format!( + "Apply staging manifests for regions: {:?}", + apply_staging_manifests + .iter() + .map(|r| r.region_id) + .collect::>() + ), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + + let now = Instant::now(); + let receiver = mailbox.send(&ch, message, timeout).await; + + let receiver = match receiver { + Ok(receiver) => receiver, + Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu { + reason: format!( + "Pusher not found for apply staging manifests on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, + Err(err) => { + return Err(err); + } + }; + + match receiver.await { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received apply staging manifests reply: {:?}, elapsed: {:?}", + reply, + now.elapsed() + ); + let InstructionReply::ApplyStagingManifests(ApplyStagingManifestsReply { replies }) = + reply + else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect apply staging manifests reply", + } + .fail(); + }; + for reply in replies { + Self::handle_apply_staging_manifest_reply(&reply, &now, peer)?; + } + + Ok(()) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for apply staging manifests on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + fn handle_apply_staging_manifest_reply( + ApplyStagingManifestReply { + region_id, + ready, + exists, + error, + }: &ApplyStagingManifestReply, + now: &Instant, + peer: &Peer, + ) -> Result<()> { + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Region {} doesn't exist on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ) + } + ); + + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to apply staging manifest on datanode {:?}, error: {:?}, elapsed: {:?}", + peer, + error, + now.elapsed() + ), + } + .fail(); + } + + ensure!( + ready, + error::RetryLaterSnafu { + reason: format!( + "Region {} is still applying staging manifest on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ), + } + ); + + Ok(()) + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs index dbbcc4283c..d8f30b21bd 100644 --- a/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/enter_staging_region.rs @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use crate::error::{self, Error, Result}; use crate::handler::HeartbeatMailbox; +use crate::procedure::repartition::group::remap_manifest::RemapManifest; use crate::procedure::repartition::group::utils::{ HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results, }; @@ -49,7 +50,7 @@ impl State for EnterStagingRegion { ) -> Result<(Box, Status)> { self.enter_staging_regions(ctx).await?; - Ok(Self::next_state()) + Ok((Box::new(RemapManifest), Status::executing(true))) } fn as_any(&self) -> &dyn Any { @@ -58,16 +59,10 @@ impl State for EnterStagingRegion { } impl EnterStagingRegion { - #[allow(dead_code)] - fn next_state() -> (Box, Status) { - // TODO(weny): change it later. - (Box::new(EnterStagingRegion), Status::executing(true)) - } - fn build_enter_staging_instructions( prepare_result: &GroupPrepareResult, targets: &[RegionDescriptor], - ) -> Result> { + ) -> Result>> { let target_partition_expr_by_region = targets .iter() .map(|target| { @@ -93,10 +88,7 @@ impl EnterStagingRegion { partition_expr: target_partition_expr_by_region[®ion_id].clone(), }) .collect(); - instructions.insert( - peer.clone(), - Instruction::EnterStagingRegions(enter_staging_regions), - ); + instructions.insert(peer.clone(), enter_staging_regions); } Ok(instructions) @@ -117,14 +109,14 @@ impl EnterStagingRegion { })?; let (peers, tasks): (Vec<_>, Vec<_>) = instructions .iter() - .map(|(peer, instruction)| { + .map(|(peer, enter_staging_regions)| { ( peer, Self::enter_staging_region( &ctx.mailbox, &ctx.server_addr, peer, - instruction, + enter_staging_regions, operation_timeout, ), ) @@ -208,12 +200,19 @@ impl EnterStagingRegion { mailbox: &MailboxRef, server_addr: &str, peer: &Peer, - instruction: &Instruction, + enter_staging_regions: &[common_meta::instruction::EnterStagingRegion], timeout: Duration, ) -> Result<()> { let ch = Channel::Datanode(peer.id); + let instruction = Instruction::EnterStagingRegions(enter_staging_regions.to_vec()); let message = MailboxMessage::json_message( - &format!("Enter staging regions: {:?}", instruction), + &format!( + "Enter staging regions: {:?}", + enter_staging_regions + .iter() + .map(|r| r.region_id) + .collect::>() + ), &format!("Metasrv@{}", server_addr), &format!("Datanode-{}@{}", peer.id, peer.addr), common_time::util::current_time_millis(), @@ -328,7 +327,6 @@ mod tests { use std::assert_matches::assert_matches; use std::time::Duration; - use common_meta::instruction::Instruction; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use store_api::storage::RegionId; @@ -376,7 +374,7 @@ mod tests { }, ], central_region: RegionId::new(table_id, 1), - central_region_datanode_id: 1, + central_region_datanode: Peer::empty(1), }; let targets = test_targets(); let instructions = @@ -384,12 +382,7 @@ mod tests { .unwrap(); assert_eq!(instructions.len(), 2); - let instruction_1 = instructions - .get(&Peer::empty(1)) - .unwrap() - .clone() - .into_enter_staging_regions() - .unwrap(); + let instruction_1 = instructions.get(&Peer::empty(1)).unwrap().clone(); assert_eq!( instruction_1, vec![common_meta::instruction::EnterStagingRegion { @@ -397,12 +390,7 @@ mod tests { partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), }] ); - let instruction_2 = instructions - .get(&Peer::empty(2)) - .unwrap() - .clone() - .into_enter_staging_regions() - .unwrap(); + let instruction_2 = instructions.get(&Peer::empty(2)).unwrap().clone(); assert_eq!( instruction_2, vec![common_meta::instruction::EnterStagingRegion { @@ -417,18 +405,17 @@ mod tests { let env = TestingEnv::new(); let server_addr = "localhost"; let peer = Peer::empty(1); - let instruction = - Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), - }]); + let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]; let timeout = Duration::from_secs(10); let err = EnterStagingRegion::enter_staging_region( env.mailbox_ctx.mailbox(), server_addr, &peer, - &instruction, + &enter_staging_regions, timeout, ) .await @@ -447,11 +434,10 @@ mod tests { .await; let server_addr = "localhost"; let peer = Peer::empty(1); - let instruction = - Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), - }]); + let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]; let timeout = Duration::from_secs(10); // Sends a timeout error. @@ -463,7 +449,7 @@ mod tests { env.mailbox_ctx.mailbox(), server_addr, &peer, - &instruction, + &enter_staging_regions, timeout, ) .await @@ -479,11 +465,10 @@ mod tests { let server_addr = "localhost"; let peer = Peer::empty(1); - let instruction = - Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), - }]); + let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]; let timeout = Duration::from_secs(10); env.mailbox_ctx @@ -498,7 +483,7 @@ mod tests { env.mailbox_ctx.mailbox(), server_addr, &peer, - &instruction, + &enter_staging_regions, timeout, ) .await @@ -516,11 +501,10 @@ mod tests { .await; let server_addr = "localhost"; let peer = Peer::empty(1); - let instruction = - Instruction::EnterStagingRegions(vec![common_meta::instruction::EnterStagingRegion { - region_id: RegionId::new(1024, 1), - partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), - }]); + let enter_staging_regions = vec![common_meta::instruction::EnterStagingRegion { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 10).as_json_str().unwrap(), + }]; let timeout = Duration::from_secs(10); // Sends a failed reply. @@ -538,7 +522,7 @@ mod tests { env.mailbox_ctx.mailbox(), server_addr, &peer, - &instruction, + &enter_staging_regions, timeout, ) .await @@ -565,7 +549,7 @@ mod tests { env.mailbox_ctx.mailbox(), server_addr, &peer, - &instruction, + &enter_staging_regions, timeout, ) .await @@ -596,7 +580,7 @@ mod tests { }, ], central_region: RegionId::new(table_id, 1), - central_region_datanode_id: 1, + central_region_datanode: Peer::empty(1), } } diff --git a/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs new file mode 100644 index 0000000000..af4b3bcf45 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/remap_manifest.rs @@ -0,0 +1,222 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use api::v1::meta::MailboxMessage; +use common_meta::instruction::{Instruction, InstructionReply, RemapManifestReply}; +use common_meta::peer::Peer; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::{info, warn}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::RegionId; + +use crate::error::{self, Result}; +use crate::handler::HeartbeatMailbox; +use crate::procedure::repartition::group::apply_staging_manifest::ApplyStagingManifest; +use crate::procedure::repartition::group::{Context, State}; +use crate::procedure::repartition::plan::RegionDescriptor; +use crate::service::mailbox::{Channel, MailboxRef}; + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct RemapManifest; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RemapManifest { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap(); + let remap = Self::build_remap_manifest_instructions( + &ctx.persistent_ctx.sources, + &ctx.persistent_ctx.targets, + &ctx.persistent_ctx.region_mapping, + prepare_result.central_region, + )?; + let operation_timeout = + ctx.next_operation_timeout() + .context(error::ExceededDeadlineSnafu { + operation: "Remap manifests", + })?; + let manifest_paths = Self::remap_manifests( + &ctx.mailbox, + &ctx.server_addr, + &prepare_result.central_region_datanode, + &remap, + operation_timeout, + ) + .await?; + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + + if manifest_paths.len() != ctx.persistent_ctx.targets.len() { + warn!( + "Mismatch in manifest paths count: expected {}, got {}. This occurred during remapping manifests for group {} and table {}.", + ctx.persistent_ctx.targets.len(), + manifest_paths.len(), + group_id, + table_id + ); + } + + ctx.persistent_ctx.staging_manifest_paths = manifest_paths; + + Ok((Box::new(ApplyStagingManifest), Status::executing(true))) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl RemapManifest { + fn build_remap_manifest_instructions( + source_regions: &[RegionDescriptor], + target_regions: &[RegionDescriptor], + region_mapping: &HashMap>, + central_region_id: RegionId, + ) -> Result { + let new_partition_exprs = target_regions + .iter() + .map(|r| { + Ok(( + r.region_id, + r.partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?, + )) + }) + .collect::>>()?; + + Ok(common_meta::instruction::RemapManifest { + region_id: central_region_id, + input_regions: source_regions.iter().map(|r| r.region_id).collect(), + region_mapping: region_mapping.clone(), + new_partition_exprs, + }) + } + + async fn remap_manifests( + mailbox: &MailboxRef, + server_addr: &str, + peer: &Peer, + remap: &common_meta::instruction::RemapManifest, + timeout: Duration, + ) -> Result> { + let ch = Channel::Datanode(peer.id); + let instruction = Instruction::RemapManifest(remap.clone()); + let message = MailboxMessage::json_message( + &format!( + "Remap manifests, central region: {}, input regions: {:?}", + remap.region_id, remap.input_regions + ), + &format!("Metasrv@{}", server_addr), + &format!("Datanode-{}@{}", peer.id, peer.addr), + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| error::SerializeToJsonSnafu { + input: instruction.to_string(), + })?; + let now = Instant::now(); + let receiver = mailbox.send(&ch, message, timeout).await; + + let receiver = match receiver { + Ok(receiver) => receiver, + Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu { + reason: format!( + "Pusher not found for remap manifests on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ), + } + .fail()?, + Err(err) => { + return Err(err); + } + }; + + match receiver.await { + Ok(msg) => { + let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received remap manifest reply: {:?}, elapsed: {:?}", + reply, + now.elapsed() + ); + let InstructionReply::RemapManifest(reply) = reply else { + return error::UnexpectedInstructionReplySnafu { + mailbox_message: msg.to_string(), + reason: "expect remap manifest reply", + } + .fail(); + }; + + Self::handle_remap_manifest_reply(remap.region_id, reply, &now, peer) + } + Err(error::Error::MailboxTimeout { .. }) => { + let reason = format!( + "Mailbox received timeout for remap manifests on datanode {:?}, elapsed: {:?}", + peer, + now.elapsed() + ); + error::RetryLaterSnafu { reason }.fail() + } + Err(err) => Err(err), + } + } + + fn handle_remap_manifest_reply( + region_id: RegionId, + RemapManifestReply { + exists, + manifest_paths, + error, + }: RemapManifestReply, + now: &Instant, + peer: &Peer, + ) -> Result> { + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Region {} doesn't exist on datanode {:?}, elapsed: {:?}", + region_id, + peer, + now.elapsed() + ) + } + ); + + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to remap manifest on datanode {:?}, error: {:?}, elapsed: {:?}", + peer, + error, + now.elapsed() + ), + } + .fail(); + } + + Ok(manifest_paths) + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_end.rs b/src/meta-srv/src/procedure/repartition/group/repartition_end.rs new file mode 100644 index 0000000000..74469861b4 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/repartition_end.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; + +use crate::error::Result; +use crate::procedure::repartition::group::{Context, State}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RepartitionEnd; + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RepartitionEnd { + async fn next( + &mut self, + _ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + Ok((Box::new(RepartitionEnd), Status::done())) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs index 7890021206..e261119f7c 100644 --- a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use crate::error::{self, Result}; +use crate::procedure::repartition::group::update_metadata::UpdateMetadata; use crate::procedure::repartition::group::{ Context, GroupId, GroupPrepareResult, State, region_routes, }; @@ -109,7 +110,7 @@ impl RepartitionStart { ); } let central_region = sources[0].region_id; - let central_region_datanode_id = source_region_routes[0] + let central_region_datanode = source_region_routes[0] .leader_peer .as_ref() .context(error::UnexpectedSnafu { @@ -118,20 +119,22 @@ impl RepartitionStart { central_region ), })? - .id; + .clone(); Ok(GroupPrepareResult { source_routes: source_region_routes, target_routes: target_region_routes, central_region, - central_region_datanode_id, + central_region_datanode, }) } #[allow(dead_code)] fn next_state() -> (Box, Status) { - // TODO(weny): change it later. - (Box::new(RepartitionStart), Status::executing(true)) + ( + Box::new(UpdateMetadata::ApplyStaging), + Status::executing(true), + ) } } diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs index 8f42ff8432..ecd9a93046 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata.rs @@ -17,12 +17,14 @@ pub(crate) mod rollback_staging_region; use std::any::Any; +use common_meta::lock_key::TableLock; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::warn; use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::procedure::repartition::group::repartition_start::RepartitionStart; +use crate::procedure::repartition::group::enter_staging_region::EnterStagingRegion; +use crate::procedure::repartition::group::repartition_end::RepartitionEnd; use crate::procedure::repartition::group::{Context, State}; #[derive(Debug, Serialize, Deserialize)] @@ -33,22 +35,16 @@ pub enum UpdateMetadata { RollbackStaging, } -impl UpdateMetadata { - #[allow(dead_code)] - fn next_state() -> (Box, Status) { - // TODO(weny): change it later. - (Box::new(RepartitionStart), Status::executing(true)) - } -} - #[async_trait::async_trait] #[typetag::serde] impl State for UpdateMetadata { async fn next( &mut self, ctx: &mut Context, - _procedure_ctx: &ProcedureContext, + procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { + let table_lock = TableLock::Write(ctx.persistent_ctx.table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; match self { UpdateMetadata::ApplyStaging => { // TODO(weny): If all metadata have already been updated, skip applying staging regions. @@ -59,7 +55,7 @@ impl State for UpdateMetadata { "Failed to broadcast the invalidate table cache message during the apply staging regions, error: {err:?}" ); }; - Ok(Self::next_state()) + Ok((Box::new(EnterStagingRegion), Status::executing(false))) } UpdateMetadata::RollbackStaging => { self.rollback_staging_regions(ctx).await?; @@ -69,7 +65,7 @@ impl State for UpdateMetadata { "Failed to broadcast the invalidate table cache message during the rollback staging regions, error: {err:?}" ); }; - Ok(Self::next_state()) + Ok((Box::new(RepartitionEnd), Status::executing(false))) } } } diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs index b548a5a877..9d22531a1d 100644 --- a/src/meta-srv/src/procedure/repartition/plan.rs +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -41,6 +41,9 @@ pub struct AllocationPlanEntry { pub regions_to_allocate: usize, /// The number of regions that need to be deallocated (source count - target count, if positive). pub regions_to_deallocate: usize, + /// For each `source_regions[k]`, the corresponding vector contains global + /// `target_partition_exprs` that overlap with it. + pub transition_map: Vec>, } /// A plan entry for the dispatch phase after region allocation, @@ -57,6 +60,9 @@ pub struct RepartitionPlanEntry { pub allocated_region_ids: Vec, /// The region ids of the regions that are pending deallocation. pub pending_deallocate_region_ids: Vec, + /// For each `source_regions[k]`, the corresponding vector contains global + /// `target_regions` that overlap with it. + pub transition_map: Vec>, } impl RepartitionPlanEntry { @@ -71,6 +77,7 @@ impl RepartitionPlanEntry { target_partition_exprs, regions_to_allocate, regions_to_deallocate, + transition_map, }: &AllocationPlanEntry, ) -> Self { debug_assert!(*regions_to_allocate == 0 && *regions_to_deallocate == 0); @@ -89,6 +96,7 @@ impl RepartitionPlanEntry { target_regions, allocated_region_ids: vec![], pending_deallocate_region_ids: vec![], + transition_map: transition_map.clone(), } } } diff --git a/src/meta-srv/src/procedure/repartition/repartition_start.rs b/src/meta-srv/src/procedure/repartition/repartition_start.rs index 9244c8ff76..f9bed479c5 100644 --- a/src/meta-srv/src/procedure/repartition/repartition_start.rs +++ b/src/meta-srv/src/procedure/repartition/repartition_start.rs @@ -129,6 +129,7 @@ impl RepartitionStart { target_partition_exprs, regions_to_allocate, regions_to_deallocate, + transition_map: subtask.transition_map, } }) .collect::>() diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs index 771fe3d8e4..35fdeee759 100644 --- a/src/meta-srv/src/procedure/repartition/test_util.rs +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -87,9 +88,13 @@ pub fn new_persistent_context( ) -> PersistentContext { PersistentContext { group_id: Uuid::new_v4(), + catalog_name: "test_catalog".to_string(), + schema_name: "test_schema".to_string(), table_id, sources, targets, + region_mapping: HashMap::new(), group_prepare_result: None, + staging_manifest_paths: HashMap::new(), } }