feat: add region repartition group procedure infrastructure (#7299)

* feat: add region repartition group procedure infrastructure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-27 12:57:45 +08:00
committed by GitHub
parent 0aeaf405c7
commit e44323c433
12 changed files with 771 additions and 64 deletions

1
Cargo.lock generated
View File

@@ -7445,6 +7445,7 @@ dependencies = [
"once_cell",
"ordered-float 4.6.0",
"parking_lot 0.12.4",
"partition",
"prometheus",
"prost 0.13.5",
"rand 0.9.1",

View File

@@ -34,6 +34,8 @@ pub mod memory;
#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
pub mod rds;
pub mod test;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub mod txn;
pub mod util;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use derive_builder::Builder;
use crate::error::Result;
use crate::kv_backend::txn::{Txn, TxnResponse};
use crate::kv_backend::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, KvBackend, PutRequest, PutResponse,
RangeRequest, RangeResponse, TxnService,
};
pub type MockFn<Req, Resp> = Arc<dyn Fn(Req) -> Result<Resp> + Send + Sync>;
/// A mock kv backend for testing.
#[derive(Builder)]
pub struct MockKvBackend {
#[builder(setter(strip_option), default)]
pub range_fn: Option<MockFn<RangeRequest, RangeResponse>>,
#[builder(setter(strip_option), default)]
pub put_fn: Option<MockFn<PutRequest, PutResponse>>,
#[builder(setter(strip_option), default)]
pub batch_put_fn: Option<MockFn<BatchPutRequest, BatchPutResponse>>,
#[builder(setter(strip_option), default)]
pub batch_get_fn: Option<MockFn<BatchGetRequest, BatchGetResponse>>,
#[builder(setter(strip_option), default)]
pub delete_range_fn: Option<MockFn<DeleteRangeRequest, DeleteRangeResponse>>,
#[builder(setter(strip_option), default)]
pub batch_delete_fn: Option<MockFn<BatchDeleteRequest, BatchDeleteResponse>>,
#[builder(setter(strip_option), default)]
pub txn: Option<MockFn<Txn, TxnResponse>>,
#[builder(setter(strip_option), default)]
pub max_txn_ops: Option<usize>,
}
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = crate::error::Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
if let Some(f) = &self.txn {
f(txn)
} else {
unimplemented!()
}
}
fn max_txn_ops(&self) -> usize {
self.max_txn_ops.unwrap()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
if let Some(f) = &self.range_fn {
f(req)
} else {
unimplemented!()
}
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
if let Some(f) = &self.put_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
if let Some(f) = &self.batch_put_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
if let Some(f) = &self.batch_get_fn {
f(req)
} else {
unimplemented!()
}
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
if let Some(f) = &self.delete_range_fn {
f(req)
} else {
unimplemented!()
}
}
async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
if let Some(f) = &self.batch_delete_fn {
f(req)
} else {
unimplemented!()
}
}
}

View File

@@ -64,6 +64,7 @@ lazy_static.workspace = true
once_cell.workspace = true
ordered-float.workspace = true
parking_lot.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true

View File

@@ -23,6 +23,7 @@ use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::mpsc::error::SendError;
use tonic::codegen::http;
use uuid::Uuid;
use crate::metasrv::SelectTarget;
use crate::pubsub::Message;
@@ -982,6 +983,52 @@ pub enum Error {
#[snafu(source)]
source: common_meta::error::Error,
},
#[snafu(display(
"Repartition group {} source region missing, region id: {}",
group_id,
region_id
))]
RepartitionSourceRegionMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Repartition group {} target region missing, region id: {}",
group_id,
region_id
))]
RepartitionTargetRegionMissing {
group_id: Uuid,
region_id: RegionId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to serialize partition expression: {}", source))]
SerializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Partition expression mismatch, region id: {}, expected: {}, actual: {}",
region_id,
expected,
actual
))]
PartitionExprMismatch {
region_id: RegionId,
expected: String,
actual: String,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -1041,6 +1088,7 @@ impl ErrorExt for Error {
| Error::MailboxChannelClosed { .. }
| Error::IsNotLeader { .. } => StatusCode::IllegalState,
Error::RetryLaterWithSource { source, .. } => source.status_code(),
Error::SerializePartitionExpr { source, .. } => source.status_code(),
Error::Unsupported { .. } => StatusCode::Unsupported,
@@ -1062,7 +1110,10 @@ impl ErrorExt for Error {
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. }
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
| Error::LeaderPeerChanged { .. }
| Error::RepartitionSourceRegionMissing { .. }
| Error::RepartitionTargetRegionMissing { .. }
| Error::PartitionExprMismatch { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }

View File

@@ -129,27 +129,20 @@ impl HeartbeatHandler for RegionLeaseHandler {
#[cfg(test)]
mod test {
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::error::Result as MetaResult;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::txn::{Txn, TxnResponse};
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
@@ -425,63 +418,19 @@ mod test {
assert_eq!(granted, expected);
}
struct MockKvBackend;
#[async_trait::async_trait]
impl TxnService for MockKvBackend {
type Error = common_meta::error::Error;
async fn txn(&self, _txn: Txn) -> MetaResult<TxnResponse> {
unimplemented!()
}
fn max_txn_ops(&self) -> usize {
unimplemented!()
}
}
#[async_trait::async_trait]
impl KvBackend for MockKvBackend {
fn name(&self) -> &str {
"mock_kv_backend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, _req: RangeRequest) -> MetaResult<RangeResponse> {
unimplemented!()
}
async fn put(&self, _req: PutRequest) -> MetaResult<PutResponse> {
unimplemented!()
}
async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult<BatchPutResponse> {
unimplemented!()
}
async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult<BatchGetResponse> {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}
async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult<DeleteRangeResponse> {
unimplemented!()
}
async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult<BatchDeleteResponse> {
unimplemented!()
}
}
#[tokio::test]
async fn test_handle_renew_region_lease_failure() {
common_telemetry::init_default_ut_logging();
let kvbackend = Arc::new(MockKvBackend);
let kv = MockKvBackendBuilder::default()
.batch_get_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}) as _)
.build()
.unwrap();
let kvbackend = Arc::new(kv);
let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend));
let datanode_id = 1;

View File

@@ -19,6 +19,7 @@ use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
pub mod region_migration;
pub mod repartition;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod group;
pub mod plan;
#[cfg(test)]
pub mod test_util;

View File

@@ -0,0 +1,154 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod repartition_start;
use std::any::Any;
use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use uuid::Uuid;
use crate::error::{self, Result};
use crate::procedure::repartition::plan::RegionDescriptor;
pub type GroupId = Uuid;
pub struct RepartitionGroupProcedure {}
pub struct Context {
pub persistent_ctx: PersistentContext,
pub table_metadata_manager: TableMetadataManagerRef,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GroupPrepareResult {
pub source_routes: Vec<RegionRoute>,
pub target_routes: Vec<RegionRoute>,
pub central_region: RegionId,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
pub group_id: GroupId,
/// The table id of the repartition group.
pub table_id: TableId,
/// The source regions of the repartition group.
pub sources: Vec<RegionDescriptor>,
/// The target regions of the repartition group.
pub targets: Vec<RegionDescriptor>,
/// The result of group prepare.
/// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
pub group_prepare_result: Option<GroupPrepareResult>,
}
impl Context {
/// Retrieves the table route value for the given table id.
///
/// Retry:
/// - Failed to retrieve the metadata of table.
///
/// Abort:
/// - Table route not found.
pub async fn get_table_route_value(
&self,
) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
let table_id = self.persistent_ctx.table_id;
let group_id = self.persistent_ctx.group_id;
let table_route_value = self
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get_with_raw_bytes(table_id)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!(
"Failed to get table route for table: {}, repartition group: {}",
table_id, group_id
),
})?
.context(error::TableRouteNotFoundSnafu { table_id })?;
Ok(table_route_value)
}
}
#[async_trait::async_trait]
#[typetag::serde(tag = "repartition_group_state")]
pub(crate) trait State: Sync + Send + Debug {
fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}
/// Yields the next [State] and [Status].
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
fn as_any(&self) -> &dyn Any;
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::test_util::MockKvBackendBuilder;
use crate::error::Error;
use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
#[tokio::test]
async fn test_get_table_route_value_not_found_error() {
let env = TestingEnv::new();
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_table_route_value().await.unwrap_err();
assert_matches!(err, Error::TableRouteNotFound { .. });
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_get_table_route_value_retry_error() {
let kv = MockKvBackendBuilder::default()
.range_fn(Arc::new(|_| {
common_meta::error::UnexpectedSnafu {
err_msg: "mock err",
}
.fail()
}))
.build()
.unwrap();
let mut env = TestingEnv::new();
env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
let persistent_context = new_persistent_context(1024, vec![], vec![]);
let ctx = env.create_context(persistent_context);
let err = ctx.get_table_route_value().await.unwrap_err();
assert!(err.is_retryable());
}
}

View File

@@ -0,0 +1,304 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::debug;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use crate::error::{self, Result};
use crate::procedure::repartition::group::{Context, GroupId, GroupPrepareResult, State};
use crate::procedure::repartition::plan::RegionDescriptor;
#[derive(Debug, Serialize, Deserialize)]
pub struct RepartitionStart;
/// Ensures that the partition expression of the region route matches the partition expression of the region descriptor.
fn ensure_region_route_expr_match(
region_route: &RegionRoute,
region_descriptor: &RegionDescriptor,
) -> Result<RegionRoute> {
let actual = &region_route.region.partition_expr;
let expected = region_descriptor
.partition_expr
.as_json_str()
.context(error::SerializePartitionExprSnafu)?;
ensure!(
actual == &expected,
error::PartitionExprMismatchSnafu {
region_id: region_route.region.id,
expected,
actual,
}
);
Ok(region_route.clone())
}
impl RepartitionStart {
/// Ensures that both source and target regions are present in the region routes.
///
/// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
#[allow(dead_code)]
fn ensure_route_present(
group_id: GroupId,
region_routes: &[RegionRoute],
sources: &[RegionDescriptor],
targets: &[RegionDescriptor],
) -> Result<GroupPrepareResult> {
ensure!(
!sources.is_empty(),
error::UnexpectedSnafu {
violated: "Sources are empty"
}
);
let central_region = sources[0].region_id;
let region_routes_map = region_routes
.iter()
.map(|r| (r.region.id, r))
.collect::<HashMap<_, _>>();
let source_region_routes = sources
.iter()
.map(|s| {
region_routes_map
.get(&s.region_id)
.context(error::RepartitionSourceRegionMissingSnafu {
group_id,
region_id: s.region_id,
})
.and_then(|r| ensure_region_route_expr_match(r, s))
})
.collect::<Result<Vec<_>>>()?;
let target_region_routes = targets
.iter()
.map(|t| {
region_routes_map
.get(&t.region_id)
.context(error::RepartitionTargetRegionMissingSnafu {
group_id,
region_id: t.region_id,
})
.and_then(|r| ensure_region_route_expr_match(r, t))
})
.collect::<Result<Vec<_>>>()?;
Ok(GroupPrepareResult {
source_routes: source_region_routes,
target_routes: target_region_routes,
central_region,
})
}
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
// TODO(weny): change it later.
(Box::new(RepartitionStart), Status::executing(true))
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for RepartitionStart {
/// Captures the group prepare result.
///
/// Retry:
/// - Failed to get the table route.
///
/// Abort
/// - Table route not found.
/// - Table route is not physical.
/// - Failed to ensure the route is present.
/// - Failed to capture the group prepare result.
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.persistent_ctx.group_prepare_result.is_some() {
return Ok(Self::next_state());
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
let table_route_value = ctx.get_table_route_value().await?.into_inner();
let region_routes = table_route_value.region_routes().with_context(|_| {
error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!(
"TableRoute({:?}) is a non-physical TableRouteValue.",
table_id
),
}
})?;
let group_prepare_result = Self::ensure_route_present(
group_id,
region_routes,
&ctx.persistent_ctx.sources,
&ctx.persistent_ctx.targets,
)?;
ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
debug!(
"Repartition group {}: captured {} sources, {} targets",
group_id,
ctx.persistent_ctx.sources.len(),
ctx.persistent_ctx.targets.len()
);
Ok(Self::next_state())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::error::Error;
use crate::procedure::repartition::group::repartition_start::RepartitionStart;
use crate::procedure::repartition::plan::RegionDescriptor;
use crate::procedure::repartition::test_util::range_expr;
#[test]
fn test_ensure_route_present_missing_source_region() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 2),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
}
#[test]
fn test_ensure_route_present_partition_expr_mismatch() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![
RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
RegionRoute {
region: Region {
id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
},
];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::PartitionExprMismatch { .. });
}
#[test]
fn test_ensure_route_present_missing_target_region() {
let source_region = RegionDescriptor {
region_id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100),
};
let target_region = RegionDescriptor {
region_id: RegionId::new(1024, 2),
partition_expr: range_expr("x", 0, 10),
};
let region_routes = vec![RegionRoute {
region: Region {
id: RegionId::new(1024, 1),
partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
..Default::default()
},
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
let err = RepartitionStart::ensure_route_present(
Uuid::new_v4(),
&region_routes,
&[source_region],
&[target_region],
)
.unwrap_err();
assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
}
}

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use store_api::storage::RegionId;
/// Metadata describing a region involved in the plan.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegionDescriptor {
/// The region id of the region involved in the plan.
pub region_id: RegionId,
/// The partition expression of the region.
pub partition_expr: PartitionExpr,
}

View File

@@ -0,0 +1,74 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::storage::TableId;
use uuid::Uuid;
use crate::procedure::repartition::group::{Context, PersistentContext};
use crate::procedure::repartition::plan::RegionDescriptor;
/// `TestingEnv` provides components during the tests.
pub struct TestingEnv {
pub table_metadata_manager: TableMetadataManagerRef,
}
impl Default for TestingEnv {
fn default() -> Self {
Self::new()
}
}
impl TestingEnv {
pub fn new() -> Self {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
Self {
table_metadata_manager,
}
}
pub fn create_context(self, persistent_context: PersistentContext) -> Context {
Context {
persistent_ctx: persistent_context,
table_metadata_manager: self.table_metadata_manager.clone(),
}
}
}
pub fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
col(col_name)
.gt_eq(Value::Int64(start))
.and(col(col_name).lt(Value::Int64(end)))
}
pub fn new_persistent_context(
table_id: TableId,
sources: Vec<RegionDescriptor>,
targets: Vec<RegionDescriptor>,
) -> PersistentContext {
PersistentContext {
group_id: Uuid::new_v4(),
table_id,
sources,
targets,
group_prepare_result: None,
}
}