From e44323c43377cdcaab82e97f1d76d5538baf78b8 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 27 Nov 2025 12:57:45 +0800 Subject: [PATCH] feat: add region repartition group procedure infrastructure (#7299) * feat: add region repartition group procedure infrastructure Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 1 + src/common/meta/src/kv_backend.rs | 2 + src/common/meta/src/kv_backend/test_util.rs | 125 +++++++ src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/error.rs | 53 ++- .../src/handler/region_lease_handler.rs | 75 +---- src/meta-srv/src/procedure.rs | 1 + src/meta-srv/src/procedure/repartition.rs | 19 ++ .../src/procedure/repartition/group.rs | 154 +++++++++ .../repartition/group/repartition_start.rs | 304 ++++++++++++++++++ .../src/procedure/repartition/plan.rs | 26 ++ .../src/procedure/repartition/test_util.rs | 74 +++++ 12 files changed, 771 insertions(+), 64 deletions(-) create mode 100644 src/common/meta/src/kv_backend/test_util.rs create mode 100644 src/meta-srv/src/procedure/repartition.rs create mode 100644 src/meta-srv/src/procedure/repartition/group.rs create mode 100644 src/meta-srv/src/procedure/repartition/group/repartition_start.rs create mode 100644 src/meta-srv/src/procedure/repartition/plan.rs create mode 100644 src/meta-srv/src/procedure/repartition/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 6e3980e8fa..b4a520fe9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7445,6 +7445,7 @@ dependencies = [ "once_cell", "ordered-float 4.6.0", "parking_lot 0.12.4", + "partition", "prometheus", "prost 0.13.5", "rand 0.9.1", diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index cdd7102e11..7f747508d4 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -34,6 +34,8 @@ pub mod memory; #[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))] pub mod rds; pub mod test; +#[cfg(any(test, feature = "testing"))] +pub mod test_util; pub mod txn; pub mod util; pub type KvBackendRef = Arc + Send + Sync>; diff --git a/src/common/meta/src/kv_backend/test_util.rs b/src/common/meta/src/kv_backend/test_util.rs new file mode 100644 index 0000000000..ce502c3332 --- /dev/null +++ b/src/common/meta/src/kv_backend/test_util.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use derive_builder::Builder; + +use crate::error::Result; +use crate::kv_backend::txn::{Txn, TxnResponse}; +use crate::kv_backend::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, KvBackend, PutRequest, PutResponse, + RangeRequest, RangeResponse, TxnService, +}; + +pub type MockFn = Arc Result + Send + Sync>; + +/// A mock kv backend for testing. +#[derive(Builder)] +pub struct MockKvBackend { + #[builder(setter(strip_option), default)] + pub range_fn: Option>, + #[builder(setter(strip_option), default)] + pub put_fn: Option>, + #[builder(setter(strip_option), default)] + pub batch_put_fn: Option>, + #[builder(setter(strip_option), default)] + pub batch_get_fn: Option>, + #[builder(setter(strip_option), default)] + pub delete_range_fn: Option>, + #[builder(setter(strip_option), default)] + pub batch_delete_fn: Option>, + #[builder(setter(strip_option), default)] + pub txn: Option>, + #[builder(setter(strip_option), default)] + pub max_txn_ops: Option, +} + +#[async_trait::async_trait] +impl TxnService for MockKvBackend { + type Error = crate::error::Error; + + async fn txn(&self, txn: Txn) -> Result { + if let Some(f) = &self.txn { + f(txn) + } else { + unimplemented!() + } + } + + fn max_txn_ops(&self) -> usize { + self.max_txn_ops.unwrap() + } +} + +#[async_trait::async_trait] +impl KvBackend for MockKvBackend { + fn name(&self) -> &str { + "mock_kv_backend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, req: RangeRequest) -> Result { + if let Some(f) = &self.range_fn { + f(req) + } else { + unimplemented!() + } + } + + async fn put(&self, req: PutRequest) -> Result { + if let Some(f) = &self.put_fn { + f(req) + } else { + unimplemented!() + } + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + if let Some(f) = &self.batch_put_fn { + f(req) + } else { + unimplemented!() + } + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + if let Some(f) = &self.batch_get_fn { + f(req) + } else { + unimplemented!() + } + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + if let Some(f) = &self.delete_range_fn { + f(req) + } else { + unimplemented!() + } + } + + async fn batch_delete(&self, req: BatchDeleteRequest) -> Result { + if let Some(f) = &self.batch_delete_fn { + f(req) + } else { + unimplemented!() + } + } +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 2465272122..3ed3c5a834 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -64,6 +64,7 @@ lazy_static.workspace = true once_cell.workspace = true ordered-float.workspace = true parking_lot.workspace = true +partition.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 557fb196d6..f00ccdeb3a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -23,6 +23,7 @@ use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; +use uuid::Uuid; use crate::metasrv::SelectTarget; use crate::pubsub::Message; @@ -982,6 +983,52 @@ pub enum Error { #[snafu(source)] source: common_meta::error::Error, }, + + #[snafu(display( + "Repartition group {} source region missing, region id: {}", + group_id, + region_id + ))] + RepartitionSourceRegionMissing { + group_id: Uuid, + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Repartition group {} target region missing, region id: {}", + group_id, + region_id + ))] + RepartitionTargetRegionMissing { + group_id: Uuid, + region_id: RegionId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to serialize partition expression: {}", source))] + SerializePartitionExpr { + #[snafu(source)] + source: partition::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Partition expression mismatch, region id: {}, expected: {}, actual: {}", + region_id, + expected, + actual + ))] + PartitionExprMismatch { + region_id: RegionId, + expected: String, + actual: String, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -1041,6 +1088,7 @@ impl ErrorExt for Error { | Error::MailboxChannelClosed { .. } | Error::IsNotLeader { .. } => StatusCode::IllegalState, Error::RetryLaterWithSource { source, .. } => source.status_code(), + Error::SerializePartitionExpr { source, .. } => source.status_code(), Error::Unsupported { .. } => StatusCode::Unsupported, @@ -1062,7 +1110,10 @@ impl ErrorExt for Error { | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } | Error::HandlerNotFound { .. } - | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments, + | Error::LeaderPeerChanged { .. } + | Error::RepartitionSourceRegionMissing { .. } + | Error::RepartitionTargetRegionMissing { .. } + | Error::PartitionExprMismatch { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 4483b0ecdd..d0e9757742 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -129,27 +129,20 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { - use std::any::Any; + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use common_meta::distributed_time_constants; - use common_meta::error::Result as MetaResult; use common_meta::key::TableMetadataManager; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::txn::{Txn, TxnResponse}; - use common_meta::kv_backend::{KvBackend, TxnService}; + use common_meta::kv_backend::test_util::MockKvBackendBuilder; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; - use common_meta::rpc::store::{ - BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, - BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, - PutResponse, RangeRequest, RangeResponse, - }; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -425,63 +418,19 @@ mod test { assert_eq!(granted, expected); } - struct MockKvBackend; - - #[async_trait::async_trait] - impl TxnService for MockKvBackend { - type Error = common_meta::error::Error; - - async fn txn(&self, _txn: Txn) -> MetaResult { - unimplemented!() - } - - fn max_txn_ops(&self) -> usize { - unimplemented!() - } - } - - #[async_trait::async_trait] - impl KvBackend for MockKvBackend { - fn name(&self) -> &str { - "mock_kv_backend" - } - - fn as_any(&self) -> &dyn Any { - self - } - - async fn range(&self, _req: RangeRequest) -> MetaResult { - unimplemented!() - } - - async fn put(&self, _req: PutRequest) -> MetaResult { - unimplemented!() - } - - async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult { - unimplemented!() - } - - async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult { - common_meta::error::UnexpectedSnafu { - err_msg: "mock err", - } - .fail() - } - - async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult { - unimplemented!() - } - - async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult { - unimplemented!() - } - } - #[tokio::test] async fn test_handle_renew_region_lease_failure() { common_telemetry::init_default_ut_logging(); - let kvbackend = Arc::new(MockKvBackend); + let kv = MockKvBackendBuilder::default() + .batch_get_fn(Arc::new(|_| { + common_meta::error::UnexpectedSnafu { + err_msg: "mock err", + } + .fail() + }) as _) + .build() + .unwrap(); + let kvbackend = Arc::new(kv); let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend)); let datanode_id = 1; diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 88869d8482..da1a1b00e7 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef; use snafu::ResultExt; pub mod region_migration; +pub mod repartition; #[cfg(any(test, feature = "testing"))] pub mod test_util; #[cfg(test)] diff --git a/src/meta-srv/src/procedure/repartition.rs b/src/meta-srv/src/procedure/repartition.rs new file mode 100644 index 0000000000..f55d349df5 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod group; +pub mod plan; + +#[cfg(test)] +pub mod test_util; diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs new file mode 100644 index 0000000000..2048e28398 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -0,0 +1,154 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod repartition_start; + +use std::any::Any; +use std::fmt::Debug; + +use common_error::ext::BoxedError; +use common_meta::key::table_route::TableRouteValue; +use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use common_meta::rpc::router::RegionRoute; +use common_procedure::{Context as ProcedureContext, Status}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; +use uuid::Uuid; + +use crate::error::{self, Result}; +use crate::procedure::repartition::plan::RegionDescriptor; + +pub type GroupId = Uuid; + +pub struct RepartitionGroupProcedure {} + +pub struct Context { + pub persistent_ctx: PersistentContext, + + pub table_metadata_manager: TableMetadataManagerRef, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct GroupPrepareResult { + pub source_routes: Vec, + pub target_routes: Vec, + pub central_region: RegionId, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PersistentContext { + pub group_id: GroupId, + /// The table id of the repartition group. + pub table_id: TableId, + /// The source regions of the repartition group. + pub sources: Vec, + /// The target regions of the repartition group. + pub targets: Vec, + /// The result of group prepare. + /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state. + pub group_prepare_result: Option, +} + +impl Context { + /// Retrieves the table route value for the given table id. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + /// + /// Abort: + /// - Table route not found. + pub async fn get_table_route_value( + &self, + ) -> Result> { + let table_id = self.persistent_ctx.table_id; + let group_id = self.persistent_ctx.group_id; + let table_route_value = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get_with_raw_bytes(table_id) + .await + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to get table route for table: {}, repartition group: {}", + table_id, group_id + ), + })? + .context(error::TableRouteNotFoundSnafu { table_id })?; + + Ok(table_route_value) + } +} + +#[async_trait::async_trait] +#[typetag::serde(tag = "repartition_group_state")] +pub(crate) trait State: Sync + Send + Debug { + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + + /// Yields the next [State] and [Status]. + async fn next( + &mut self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; + + fn as_any(&self) -> &dyn Any; +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_meta::key::TableMetadataManager; + use common_meta::kv_backend::test_util::MockKvBackendBuilder; + + use crate::error::Error; + use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context}; + + #[tokio::test] + async fn test_get_table_route_value_not_found_error() { + let env = TestingEnv::new(); + let persistent_context = new_persistent_context(1024, vec![], vec![]); + let ctx = env.create_context(persistent_context); + let err = ctx.get_table_route_value().await.unwrap_err(); + assert_matches!(err, Error::TableRouteNotFound { .. }); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn test_get_table_route_value_retry_error() { + let kv = MockKvBackendBuilder::default() + .range_fn(Arc::new(|_| { + common_meta::error::UnexpectedSnafu { + err_msg: "mock err", + } + .fail() + })) + .build() + .unwrap(); + let mut env = TestingEnv::new(); + env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv))); + let persistent_context = new_persistent_context(1024, vec![], vec![]); + let ctx = env.create_context(persistent_context); + let err = ctx.get_table_route_value().await.unwrap_err(); + assert!(err.is_retryable()); + } +} diff --git a/src/meta-srv/src/procedure/repartition/group/repartition_start.rs b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs new file mode 100644 index 0000000000..7cdf620721 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/group/repartition_start.rs @@ -0,0 +1,304 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::HashMap; + +use common_meta::rpc::router::RegionRoute; +use common_procedure::{Context as ProcedureContext, Status}; +use common_telemetry::debug; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt, ensure}; + +use crate::error::{self, Result}; +use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State}; +use crate::procedure::repartition::plan::RegionDescriptor; + +#[derive(Debug, Serialize, Deserialize)] +pub struct RepartitionStart; + +/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor. +fn ensure_region_route_expr_match( + region_route: &RegionRoute, + region_descriptor: &RegionDescriptor, +) -> Result { + let actual = ®ion_route.region.partition_expr; + let expected = region_descriptor + .partition_expr + .as_json_str() + .context(error::SerializePartitionExprSnafu)?; + ensure!( + actual == &expected, + error::PartitionExprMismatchSnafu { + region_id: region_route.region.id, + expected, + actual, + } + ); + Ok(region_route.clone()) +} + +impl RepartitionStart { + /// Ensures that both source and target regions are present in the region routes. + /// + /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning). + #[allow(dead_code)] + fn ensure_route_present( + group_id: GroupId, + region_routes: &[RegionRoute], + sources: &[RegionDescriptor], + targets: &[RegionDescriptor], + ) -> Result { + ensure!( + !sources.is_empty(), + error::UnexpectedSnafu { + violated: "Sources are empty" + } + ); + + let central_region = sources[0].region_id; + let region_routes_map = region_routes + .iter() + .map(|r| (r.region.id, r)) + .collect::>(); + let source_region_routes = sources + .iter() + .map(|s| { + region_routes_map + .get(&s.region_id) + .context(error::RepartitionSourceRegionMissingSnafu { + group_id, + region_id: s.region_id, + }) + .and_then(|r| ensure_region_route_expr_match(r, s)) + }) + .collect::>>()?; + let target_region_routes = targets + .iter() + .map(|t| { + region_routes_map + .get(&t.region_id) + .context(error::RepartitionTargetRegionMissingSnafu { + group_id, + region_id: t.region_id, + }) + .and_then(|r| ensure_region_route_expr_match(r, t)) + }) + .collect::>>()?; + + Ok(GroupPrepareResult { + source_routes: source_region_routes, + target_routes: target_region_routes, + central_region, + }) + } + + #[allow(dead_code)] + fn next_state() -> (Box, Status) { + // TODO(weny): change it later. + (Box::new(RepartitionStart), Status::executing(true)) + } +} + +#[async_trait::async_trait] +#[typetag::serde] +impl State for RepartitionStart { + /// Captures the group prepare result. + /// + /// Retry: + /// - Failed to get the table route. + /// + /// Abort + /// - Table route not found. + /// - Table route is not physical. + /// - Failed to ensure the route is present. + /// - Failed to capture the group prepare result. + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + if ctx.persistent_ctx.group_prepare_result.is_some() { + return Ok(Self::next_state()); + } + let table_id = ctx.persistent_ctx.table_id; + let group_id = ctx.persistent_ctx.group_id; + let table_route_value = ctx.get_table_route_value().await?.into_inner(); + let region_routes = table_route_value.region_routes().with_context(|_| { + error::UnexpectedLogicalRouteTableSnafu { + err_msg: format!( + "TableRoute({:?}) is a non-physical TableRouteValue.", + table_id + ), + } + })?; + let group_prepare_result = Self::ensure_route_present( + group_id, + region_routes, + &ctx.persistent_ctx.sources, + &ctx.persistent_ctx.targets, + )?; + ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result); + debug!( + "Repartition group {}: captured {} sources, {} targets", + group_id, + ctx.persistent_ctx.sources.len(), + ctx.persistent_ctx.targets.len() + ); + + Ok(Self::next_state()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + use uuid::Uuid; + + use crate::error::Error; + use crate::procedure::repartition::group::repartition_start::RepartitionStart; + use crate::procedure::repartition::plan::RegionDescriptor; + use crate::procedure::repartition::test_util::range_expr; + + #[test] + fn test_ensure_route_present_missing_source_region() { + let source_region = RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(1024, 2), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + let err = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ) + .unwrap_err(); + assert_matches!(err, Error::RepartitionSourceRegionMissing { .. }); + } + + #[test] + fn test_ensure_route_present_partition_expr_mismatch() { + let source_region = RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + let err = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ) + .unwrap_err(); + assert_matches!(err, Error::PartitionExprMismatch { .. }); + + let source_region = RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![ + RegionRoute { + region: Region { + id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + RegionRoute { + region: Region { + id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }, + ]; + let err = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ) + .unwrap_err(); + assert_matches!(err, Error::PartitionExprMismatch { .. }); + } + + #[test] + fn test_ensure_route_present_missing_target_region() { + let source_region = RegionDescriptor { + region_id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100), + }; + let target_region = RegionDescriptor { + region_id: RegionId::new(1024, 2), + partition_expr: range_expr("x", 0, 10), + }; + let region_routes = vec![RegionRoute { + region: Region { + id: RegionId::new(1024, 1), + partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(), + ..Default::default() + }, + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + let err = RepartitionStart::ensure_route_present( + Uuid::new_v4(), + ®ion_routes, + &[source_region], + &[target_region], + ) + .unwrap_err(); + assert_matches!(err, Error::RepartitionTargetRegionMissing { .. }); + } +} diff --git a/src/meta-srv/src/procedure/repartition/plan.rs b/src/meta-srv/src/procedure/repartition/plan.rs new file mode 100644 index 0000000000..8529bc77e9 --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/plan.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use partition::expr::PartitionExpr; +use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; + +/// Metadata describing a region involved in the plan. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegionDescriptor { + /// The region id of the region involved in the plan. + pub region_id: RegionId, + /// The partition expression of the region. + pub partition_expr: PartitionExpr, +} diff --git a/src/meta-srv/src/procedure/repartition/test_util.rs b/src/meta-srv/src/procedure/repartition/test_util.rs new file mode 100644 index 0000000000..e615dea4ea --- /dev/null +++ b/src/meta-srv/src/procedure/repartition/test_util.rs @@ -0,0 +1,74 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use datatypes::value::Value; +use partition::expr::{PartitionExpr, col}; +use store_api::storage::TableId; +use uuid::Uuid; + +use crate::procedure::repartition::group::{Context, PersistentContext}; +use crate::procedure::repartition::plan::RegionDescriptor; + +/// `TestingEnv` provides components during the tests. +pub struct TestingEnv { + pub table_metadata_manager: TableMetadataManagerRef, +} + +impl Default for TestingEnv { + fn default() -> Self { + Self::new() + } +} + +impl TestingEnv { + pub fn new() -> Self { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + + Self { + table_metadata_manager, + } + } + + pub fn create_context(self, persistent_context: PersistentContext) -> Context { + Context { + persistent_ctx: persistent_context, + table_metadata_manager: self.table_metadata_manager.clone(), + } + } +} + +pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Int64(start)) + .and(col(col_name).lt(Value::Int64(end))) +} + +pub fn new_persistent_context( + table_id: TableId, + sources: Vec, + targets: Vec, +) -> PersistentContext { + PersistentContext { + group_id: Uuid::new_v4(), + table_id, + sources, + targets, + group_prepare_result: None, + } +}