feat(meta-srv): support repartition for unpartitioned tables (#8186)

* feat(meta-srv): update repartition partition metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(meta-srv): connect repartition source metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test(meta-srv): cover unpartitioned repartition rollback

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: connect unpartitioned repartition SQL path

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-28 11:31:55 +08:00
committed by GitHub
parent bf7e3551fe
commit 91ac84019b
6 changed files with 1344 additions and 77 deletions

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::alter_table_expr::Kind;
use api::v1::repartition::Source;
use api::v1::repartition::Source as PbRepartitionSource;
use api::v1::{PartitionExprs, Repartition};
use common_error::ext::BoxedError;
use common_procedure::{
@@ -49,7 +49,7 @@ use crate::error::{
self, CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, UnexpectedSnafu, WaitProcedureSnafu,
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -152,13 +152,18 @@ macro_rules! procedure_loader {
pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
pub enum RepartitionSource {
Partitioned { exprs: Vec<String> },
Unpartitioned { partition_columns: Vec<String> },
}
pub trait RepartitionProcedureFactory: Send + Sync {
fn create(
&self,
ddl_ctx: &DdlContext,
table_name: TableName,
table_id: TableId,
from_exprs: Vec<String>,
source: RepartitionSource,
to_exprs: Vec<String>,
timeout: Option<Duration>,
) -> std::result::Result<BoxedProcedure, BoxedError>;
@@ -290,18 +295,19 @@ impl DdlManager {
let into_partition_exprs = repartition.into_partition_exprs;
let source = repartition.source;
let from_partition_exprs = match source {
Some(Source::PartitionExprs(PartitionExprs { exprs })) => exprs,
Some(Source::Unpartitioned(_)) => {
return UnexpectedSnafu {
err_msg: "Unpartitioned repartition source is not supported yet".to_string(),
}
.fail();
let source = match source {
Some(PbRepartitionSource::PartitionExprs(PartitionExprs { exprs })) => {
RepartitionSource::Partitioned { exprs }
}
Some(PbRepartitionSource::Unpartitioned(source)) => RepartitionSource::Unpartitioned {
partition_columns: source.partition_columns,
},
None => {
// Reads the deprecated field for backward compatibility with old persisted DDL tasks.
#[allow(deprecated)]
repartition.from_partition_exprs
RepartitionSource::Partitioned {
exprs: repartition.from_partition_exprs,
}
}
};
@@ -311,7 +317,7 @@ impl DdlManager {
&context,
table_name,
table_id,
from_partition_exprs,
source,
into_partition_exprs,
Some(timeout),
)
@@ -1124,7 +1130,7 @@ mod tests {
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use crate::ddl_manager::RepartitionProcedureFactory;
use crate::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
use crate::key::TableMetadataManager;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -1162,7 +1168,7 @@ mod tests {
_ddl_ctx: &DdlContext,
_table_name: TableName,
_table_id: TableId,
_from_exprs: Vec<String>,
_source: RepartitionSource,
_to_exprs: Vec<String>,
_timeout: Option<Duration>,
) -> std::result::Result<BoxedProcedure, BoxedError> {

View File

@@ -20,6 +20,7 @@ pub mod group;
pub mod plan;
pub mod repartition_end;
pub mod repartition_start;
pub mod update_partition_metadata;
pub mod utils;
use std::any::Any;
@@ -32,7 +33,7 @@ use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlContext;
use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use common_meta::ddl_manager::RepartitionProcedureFactory;
use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::RegionInfo;
use common_meta::key::table_info::TableInfoValue;
@@ -62,7 +63,8 @@ use crate::procedure::repartition::group::{
Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
};
use crate::procedure::repartition::plan::RepartitionPlanEntry;
use crate::procedure::repartition::repartition_start::RepartitionStart;
use crate::procedure::repartition::repartition_start::{RepartitionFrom, RepartitionStart};
use crate::procedure::repartition::update_partition_metadata::PartitionMetadataUpdate;
use crate::procedure::repartition::utils::{
get_datanode_table_value, rollback_group_metadata_routes,
};
@@ -93,6 +95,9 @@ pub struct PersistentContext {
/// The timeout for repartition operations.
#[serde(with = "humantime_serde", default = "default_timeout")]
pub timeout: Duration,
#[serde(default)]
/// Records table-level partition metadata added by this repartition.
pub partition_metadata_update: Option<PartitionMetadataUpdate>,
}
fn default_timeout() -> Duration {
@@ -121,6 +126,7 @@ impl PersistentContext {
failed_procedures: vec![],
unknown_procedures: vec![],
timeout: timeout.unwrap_or_else(default_timeout),
partition_metadata_update: None,
}
}
@@ -317,7 +323,9 @@ impl Context {
///
/// Abort:
/// - Table info not found.
pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
pub async fn get_raw_table_info_value(
&self,
) -> Result<DeserializedValueWithBytes<TableInfoValue>> {
let table_id = self.persistent_ctx.table_id;
let table_info_value = self
.table_metadata_manager
@@ -328,11 +336,36 @@ impl Context {
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to get table info for table: {}", table_id),
})?
.context(error::TableInfoNotFoundSnafu { table_id })?
.into_inner();
.context(error::TableInfoNotFoundSnafu { table_id })?;
Ok(table_info_value)
}
pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
let table_info_value = self.get_raw_table_info_value().await?.into_inner();
Ok(table_info_value)
}
/// Updates the table info.
pub async fn update_table_info(
&self,
current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info_value: TableInfoValue,
) -> Result<()> {
let table_id = self.persistent_ctx.table_id;
self.table_metadata_manager
.update_table_info(
current_table_info_value,
None,
new_table_info_value.table_info,
)
.await
.map_err(BoxedError::new)
.with_context(|_| error::RetryLaterWithSourceSnafu {
reason: format!("Failed to update table info for table: {}", table_id),
})
}
/// Updates the table route.
///
/// Retry:
@@ -469,12 +502,8 @@ struct RepartitionDataOwned {
impl RepartitionProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(
from_exprs: Vec<PartitionExpr>,
to_exprs: Vec<PartitionExpr>,
context: Context,
) -> Self {
let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>, context: Context) -> Self {
let state = Box::new(RepartitionStart::new(from, to_exprs));
Self { state, context }
}
@@ -492,24 +521,24 @@ impl RepartitionProcedure {
Ok(Self { state, context })
}
/// Returns whether parent rollback should remove this repartition's allocated regions.
/// Returns whether parent rollback should run.
///
/// This uses an "after AllocateRegion" semantic: once execution reaches
/// `AllocateRegion` or any later state, rollback must try to remove this round's
/// `allocated_region_ids` from table-route metadata when they exist.
///
/// State flow:
/// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd`
/// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/// rollback allocated regions in metadata
/// This uses an "after repartition metadata update" semantic: once execution
/// reaches `UpdatePartitionMetadata` or any later rollback-active state,
/// rollback must try to clean metadata written by the repartition procedure.
///
/// Notes:
/// - `RepartitionStart`: no-op, because allocation has not happened yet.
/// - `AllocateRegion` / `Dispatch` / `Collect` rollback-active.
/// - `RepartitionStart`: no-op, because no metadata has been updated yet.
/// - `UpdatePartitionMetadata`: rollback table partition metadata.
/// - `AllocateRegion` / `Dispatch` / `Collect`: rollback table partition metadata
/// and allocated region metadata.
/// - `DeallocateRegion`: is not rollback-active.
/// - `RepartitionEnd`: no-op.
fn should_rollback_allocated_regions(&self) -> bool {
self.state.as_any().is::<allocate_region::AllocateRegion>()
fn should_rollback(&self) -> bool {
self.state
.as_any()
.is::<update_partition_metadata::UpdatePartitionMetadata>()
|| self.state.as_any().is::<allocate_region::AllocateRegion>()
|| self.state.as_any().is::<dispatch::Dispatch>()
|| self.state.as_any().is::<collect::Collect>()
}
@@ -526,7 +555,7 @@ impl RepartitionProcedure {
/// Returns allocated region ids that parent rollback should remove.
///
/// Rollback uses an "after AllocateRegion" semantic:
/// Rollback uses an "after region allocation" semantic:
/// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the
/// current repartition attempt and must be cleaned up.
/// - in `Collect`, only the plans referenced by failed or unknown
@@ -586,8 +615,47 @@ impl RepartitionProcedure {
Ok(())
}
async fn rollback_partition_metadata(&mut self) -> Result<()> {
let Some(update) = self
.context
.persistent_ctx
.partition_metadata_update
.as_ref()
else {
return Ok(());
};
if update.partition_key_indices.is_empty() {
return Ok(());
}
let table_info_value = self.context.get_raw_table_info_value().await?;
let mut new_partition_key_indices = table_info_value
.table_info
.meta
.partition_key_indices
.clone();
new_partition_key_indices.retain(|idx| !update.partition_key_indices.contains(idx));
if new_partition_key_indices == table_info_value.table_info.meta.partition_key_indices {
return Ok(());
}
let mut new_table_info = table_info_value.table_info.clone();
new_table_info.meta.partition_key_indices = new_partition_key_indices;
self.context
.update_table_info(&table_info_value, table_info_value.update(new_table_info))
.await?;
// Do not invalidate the table cache here. The table routes may still
// contain partition expressions until `rollback_inner` rolls them back.
// Exposing cleared partition columns with partitioned routes can build
// an inconsistent partition rule. The cache is invalidated once after
// both partition metadata and routes are rolled back.
Ok(())
}
async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
if !self.should_rollback_allocated_regions() {
if !self.should_rollback() {
return Ok(());
}
@@ -596,6 +664,8 @@ impl RepartitionProcedure {
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
self.rollback_partition_metadata().await?;
let table_route_value = self.context.get_table_route_value().await?;
let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
let mut current_region_routes = original_region_routes.clone();
@@ -738,20 +808,28 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
ddl_ctx: &DdlContext,
table_name: TableName,
table_id: TableId,
from_exprs: Vec<String>,
source: RepartitionSource,
to_exprs: Vec<String>,
timeout: Option<Duration>,
) -> std::result::Result<BoxedProcedure, BoxedError> {
let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
let from_exprs = from_exprs
.iter()
.map(|e| {
PartitionExpr::from_json_str(e)
.context(error::DeserializePartitionExprSnafu)?
.context(error::EmptyPartitionExprSnafu)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)?;
let from = match source {
RepartitionSource::Partitioned { exprs } => {
let exprs = exprs
.iter()
.map(|e| {
PartitionExpr::from_json_str(e)
.context(error::DeserializePartitionExprSnafu)?
.context(error::EmptyPartitionExprSnafu)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)?;
RepartitionFrom::Partitioned { exprs }
}
RepartitionSource::Unpartitioned { partition_columns } => {
RepartitionFrom::Unpartitioned { partition_columns }
}
};
let to_exprs = to_exprs
.iter()
.map(|e| {
@@ -763,7 +841,7 @@ impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
.map_err(BoxedError::new)?;
let procedure = RepartitionProcedure::new(
from_exprs,
from,
to_exprs,
Context::new(
ddl_ctx,
@@ -860,6 +938,9 @@ mod tests {
new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
test_region_route, test_region_wal_options,
};
use crate::procedure::repartition::update_partition_metadata::{
PartitionMetadataUpdate, UpdatePartitionMetadata,
};
fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
RepartitionPlanEntry {
@@ -927,6 +1008,15 @@ mod tests {
.unwrap()
}
async fn table_partition_key_indices(ctx: &Context) -> Vec<usize> {
ctx.get_table_info_value()
.await
.unwrap()
.table_info
.meta
.partition_key_indices
}
fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
RepartitionProcedure { state, context }
}
@@ -965,34 +1055,43 @@ mod tests {
}
#[test]
fn test_should_rollback_allocated_regions() {
fn test_should_rollback_after_metadata_update() {
let env = TestingEnv::new();
let table_id = 1024;
let procedure = test_procedure(
Box::new(RepartitionStart::new(vec![], vec![])),
Box::new(RepartitionStart::new(
RepartitionFrom::Partitioned { exprs: vec![] },
vec![],
)),
test_context(&env, table_id),
);
assert!(!procedure.should_rollback_allocated_regions());
assert!(!procedure.should_rollback());
let procedure = test_procedure(
Box::new(UpdatePartitionMetadata::new(vec![])),
test_context(&env, table_id),
);
assert!(procedure.should_rollback());
let procedure = test_procedure(
Box::new(AllocateRegion::new(vec![])),
test_context(&env, table_id),
);
assert!(procedure.should_rollback_allocated_regions());
assert!(procedure.should_rollback());
let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
assert!(procedure.should_rollback_allocated_regions());
assert!(procedure.should_rollback());
let procedure =
test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
assert!(procedure.should_rollback_allocated_regions());
assert!(procedure.should_rollback());
let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
assert!(!procedure.should_rollback_allocated_regions());
assert!(!procedure.should_rollback());
let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
assert!(!procedure.should_rollback_allocated_regions());
assert!(!procedure.should_rollback());
}
#[test]
@@ -1048,6 +1147,68 @@ mod tests {
);
}
#[test]
fn test_persistent_context_partition_metadata_update_serde_default() {
let json = r#"{
"catalog_name":"test_catalog",
"schema_name":"test_schema",
"table_name":"test_table",
"table_id":1024,
"plans":[],
"timeout":"120s"
}"#;
let persistent_ctx: PersistentContext = serde_json::from_str(json).unwrap();
assert!(persistent_ctx.partition_metadata_update.is_none());
}
#[tokio::test]
async fn test_repartition_rollback_removes_partition_metadata_indices() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(RegionId::new(table_id, 1), "")],
test_region_wal_options(&[1]),
)
.await;
let mut context = new_parent_context(&env, node_manager, table_id);
let current = context.get_raw_table_info_value().await.unwrap();
let mut table_info = current.table_info.clone();
table_info.meta.partition_key_indices = vec![0, 1];
context
.update_table_info(&current, current.update(table_info))
.await
.unwrap();
context.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate {
partition_key_indices: vec![0],
});
let mut procedure = RepartitionProcedure {
state: Box::new(UpdatePartitionMetadata::new(vec![])),
context,
};
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
assert_eq!(
procedure
.context
.get_table_info_value()
.await
.unwrap()
.table_info
.meta
.partition_key_indices,
vec![1]
);
}
#[tokio::test]
async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
let env = TestingEnv::new();
@@ -1708,7 +1869,9 @@ mod tests {
let context = new_parent_context(&env, node_manager, table_id);
let mut procedure = RepartitionProcedure::new(
vec![range_expr("x", 0, 100)],
RepartitionFrom::Partitioned {
exprs: vec![range_expr("x", 0, 100)],
},
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
context,
);
@@ -1810,6 +1973,226 @@ mod tests {
);
}
#[tokio::test]
async fn test_repartition_procedure_flow_unpartitioned_failed_and_full_rollback() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(RegionId::new(table_id, 1), "")],
test_region_wal_options(&[1]),
)
.await;
let context = new_parent_context(&env, node_manager, table_id);
let to_exprs = vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)];
let mut procedure = RepartitionProcedure::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col1".to_string()],
},
to_exprs.clone(),
context,
);
let start_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(start_status.need_persist());
assert_parent_state::<UpdatePartitionMetadata>(&procedure);
assert_eq!(
procedure
.context
.persistent_ctx
.partition_metadata_update
.as_ref()
.unwrap()
.partition_key_indices,
vec![0]
);
let update_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(update_status.need_persist());
assert_parent_state::<AllocateRegion>(&procedure);
assert_eq!(
table_partition_key_indices(&procedure.context).await,
vec![0]
);
let build_allocate_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(build_allocate_status.need_persist());
assert_parent_state::<AllocateRegion>(&procedure);
assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
let plan = &procedure.context.persistent_ctx.plans[0];
assert_eq!(
plan.source_regions,
vec![SourceRegionDescriptor::Default {
region_id: RegionId::new(table_id, 1)
}]
);
assert_eq!(plan.target_regions.len(), 2);
assert_eq!(plan.target_regions[0].region_id, RegionId::new(table_id, 1));
assert_eq!(plan.target_regions[0].partition_expr, to_exprs[0]);
assert_eq!(
plan.allocated_region_ids,
vec![plan.target_regions[1].region_id]
);
assert!(plan.pending_deallocate_region_ids.is_empty());
assert_eq!(plan.transition_map, vec![vec![0, 1]]);
let target_regions = plan.target_regions.clone();
let execute_allocate_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(execute_allocate_status.need_persist());
assert_parent_state::<Dispatch>(&procedure);
let region_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(region_routes.len(), 2);
assert_eq!(
region_route_by_id(&region_routes, target_regions[0].region_id)
.region
.partition_expr(),
""
);
assert_eq!(
region_route_by_id(&region_routes, target_regions[1].region_id)
.region
.partition_expr(),
to_exprs[1].as_json_str().unwrap()
);
let dispatch_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
assert_eq!(subprocedure_ids.len(), 1);
assert_parent_state::<Collect>(&procedure);
let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
MockError::new(StatusCode::Internal),
)));
let collect_ctx = procedure_context_with_receivers(HashMap::from([(
subprocedure_ids[0],
procedure_state_receiver(failed_state),
)]));
let err = procedure.execute(&collect_ctx).await.unwrap_err();
assert!(!err.is_retry_later());
assert_parent_state::<Collect>(&procedure);
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
assert!(
table_partition_key_indices(&procedure.context)
.await
.is_empty()
);
assert_eq!(
current_parent_region_routes(&procedure.context).await,
vec![test_region_route(RegionId::new(table_id, 1), "")]
);
}
#[tokio::test]
async fn test_repartition_procedure_flow_unpartitioned_rollback_is_idempotent() {
let env = TestingEnv::new();
let table_id = 1024;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(RegionId::new(table_id, 1), "")],
test_region_wal_options(&[1]),
)
.await;
let context = new_parent_context(&env, node_manager, table_id);
let mut procedure = RepartitionProcedure::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col1".to_string()],
},
vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)],
context,
);
procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
assert_eq!(
table_partition_key_indices(&procedure.context).await,
vec![0]
);
assert_eq!(
current_parent_region_routes(&procedure.context).await.len(),
2
);
let dispatch_status = procedure
.execute(&TestingEnv::procedure_context())
.await
.unwrap();
let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
assert_eq!(subprocedure_ids.len(), 1);
assert_parent_state::<Collect>(&procedure);
let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
MockError::new(StatusCode::Internal),
)));
let collect_ctx = procedure_context_with_receivers(HashMap::from([(
subprocedure_ids[0],
procedure_state_receiver(failed_state),
)]));
let err = procedure.execute(&collect_ctx).await.unwrap_err();
assert!(!err.is_retry_later());
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let once_indices = table_partition_key_indices(&procedure.context).await;
let once_routes = current_parent_region_routes(&procedure.context).await;
procedure
.rollback(&TestingEnv::procedure_context())
.await
.unwrap();
let twice_indices = table_partition_key_indices(&procedure.context).await;
let twice_routes = current_parent_region_routes(&procedure.context).await;
assert_eq!(once_indices, twice_indices);
assert_eq!(once_routes, twice_routes);
assert!(twice_indices.is_empty());
assert_eq!(
twice_routes,
vec![test_region_route(RegionId::new(table_id, 1), "")]
);
}
#[tokio::test]
async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
common_telemetry::init_default_ut_logging();
@@ -1852,7 +2235,9 @@ mod tests {
let context = new_parent_context(&env, node_manager, table_id);
let mut procedure = RepartitionProcedure::new(
vec![range_expr("x", 0, 100)],
RepartitionFrom::Partitioned {
exprs: vec![range_expr("x", 0, 100)],
},
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
context,
);

View File

@@ -20,7 +20,7 @@ use common_telemetry::debug;
use partition::collider::Collider;
use partition::expr::PartitionExpr;
use partition::subtask::{self, RepartitionSubtask};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use tokio::time::Instant;
use uuid::Uuid;
@@ -29,20 +29,57 @@ use crate::error::{self, Result};
use crate::procedure::repartition::allocate_region::AllocateRegion;
use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor};
use crate::procedure::repartition::repartition_end::RepartitionEnd;
use crate::procedure::repartition::update_partition_metadata::{
PartitionMetadataUpdate, UpdatePartitionMetadata,
};
use crate::procedure::repartition::{Context, State};
#[derive(Debug, Clone, Serialize)]
pub enum RepartitionFrom {
Partitioned { exprs: Vec<PartitionExpr> },
Unpartitioned { partition_columns: Vec<String> },
}
impl<'de> Deserialize<'de> for RepartitionFrom {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
enum CurrentRepartitionFrom {
Partitioned { exprs: Vec<PartitionExpr> },
Unpartitioned { partition_columns: Vec<String> },
}
#[derive(Deserialize)]
#[serde(untagged)]
enum RepartitionFromRepr {
Current(CurrentRepartitionFrom),
Legacy(Vec<PartitionExpr>),
}
match RepartitionFromRepr::deserialize(deserializer)? {
RepartitionFromRepr::Current(CurrentRepartitionFrom::Partitioned { exprs }) => {
Ok(Self::Partitioned { exprs })
}
RepartitionFromRepr::Current(CurrentRepartitionFrom::Unpartitioned {
partition_columns,
}) => Ok(Self::Unpartitioned { partition_columns }),
RepartitionFromRepr::Legacy(exprs) => Ok(Self::Partitioned { exprs }),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepartitionStart {
from_exprs: Vec<PartitionExpr>,
#[serde(alias = "from_exprs")]
from: RepartitionFrom,
to_exprs: Vec<PartitionExpr>,
}
impl RepartitionStart {
pub fn new(from_exprs: Vec<PartitionExpr>, to_exprs: Vec<PartitionExpr>) -> Self {
Self {
from_exprs,
to_exprs,
}
pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>) -> Self {
Self { from, to_exprs }
}
}
@@ -54,6 +91,13 @@ impl State for RepartitionStart {
ctx: &mut Context,
_: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
ensure!(
!self.to_exprs.is_empty(),
error::InvalidArgumentsSnafu {
err_msg: "Repartition expects non-empty target partition expressions".to_string(),
}
);
let timer = Instant::now();
let (physical_table_id, table_route) = ctx
.table_metadata_manager
@@ -72,7 +116,8 @@ impl State for RepartitionStart {
}
);
let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
let from_exprs = self.prepare_from(ctx).await?;
let plans = Self::build_plan(&table_route, from_exprs, &self.to_exprs)?;
let plan_count = plans.len();
let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
let total_target_regions: usize =
@@ -91,10 +136,17 @@ impl State for RepartitionStart {
return Ok((Box::new(RepartitionEnd), Status::done()));
}
Ok((
Box::new(AllocateRegion::new(plans)),
Status::executing(false),
))
if ctx.persistent_ctx.partition_metadata_update.is_some() {
Ok((
Box::new(UpdatePartitionMetadata::new(plans)),
Status::executing(true),
))
} else {
Ok((
Box::new(AllocateRegion::new(plans)),
Status::executing(false),
))
}
}
fn as_any(&self) -> &dyn Any {
@@ -103,6 +155,65 @@ impl State for RepartitionStart {
}
impl RepartitionStart {
async fn prepare_from<'a>(&'a self, ctx: &mut Context) -> Result<&'a [PartitionExpr]> {
match &self.from {
RepartitionFrom::Partitioned { exprs } => Ok(exprs),
RepartitionFrom::Unpartitioned { partition_columns } => {
Self::prepare_unpartitioned(ctx, partition_columns).await?;
Ok(&[])
}
}
}
async fn prepare_unpartitioned(ctx: &mut Context, partition_columns: &[String]) -> Result<()> {
if ctx.persistent_ctx.partition_metadata_update.is_some() {
return Ok(());
}
ensure!(
!partition_columns.is_empty(),
error::InvalidArgumentsSnafu {
err_msg: "Unpartitioned repartition expects non-empty partition columns"
.to_string(),
}
);
let table_info_value = ctx.get_table_info_value().await?;
ensure!(
table_info_value
.table_info
.meta
.partition_key_indices
.is_empty(),
error::InvalidArgumentsSnafu {
err_msg: format!(
"Unpartitioned repartition expects an unpartitioned table, but table {} has partition key indices: {:?}",
ctx.persistent_ctx.table_id,
table_info_value.table_info.meta.partition_key_indices
),
}
);
let schema = &table_info_value.table_info.meta.schema;
let partition_key_indices = partition_columns
.iter()
.map(|column_name| {
schema.column_index_by_name(column_name).with_context(|| {
error::InvalidArgumentsSnafu {
err_msg: format!(
"Partition column {} not found in table {}",
column_name, ctx.persistent_ctx.table_id
),
}
})
})
.collect::<Result<Vec<_>>>()?;
ctx.persistent_ctx.partition_metadata_update =
Some(PartitionMetadataUpdate::new(partition_key_indices));
Ok(())
}
pub(crate) fn build_plan(
physical_route: &PhysicalTableRouteValue,
from_exprs: &[PartitionExpr],
@@ -227,7 +338,6 @@ impl RepartitionStart {
),
}
);
let source_region = &physical_route.region_routes[0].region;
ensure!(
source_region.partition_expr().is_empty(),
@@ -247,20 +357,37 @@ impl RepartitionStart {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::test_util::MockDatanodeManager;
use datatypes::prelude::Value;
use partition::expr::{Operand, RestrictedOp};
use store_api::storage::RegionId;
use super::*;
use crate::procedure::repartition::test_util::{range_expr, test_region_route};
use crate::procedure::repartition::test_util::{
TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
};
fn physical_route(region_routes: Vec<RegionRoute>) -> PhysicalTableRouteValue {
PhysicalTableRouteValue::new(region_routes)
}
async fn new_test_context(env: &TestingEnv, table_id: u32) -> Context {
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(RegionId::new(table_id, 1), "")],
test_region_wal_options(&[1]),
)
.await;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
new_parent_context(env, node_manager, table_id)
}
#[test]
fn test_build_plan_with_default_source_region() {
let table_id = 1024;
@@ -367,4 +494,216 @@ mod tests {
)]
);
}
#[test]
fn test_repartition_start_deserializes_legacy_from_exprs() {
let from_exprs = vec![range_expr("x", 0, 100)];
let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
let json = serde_json::json!({
"from_exprs": from_exprs,
"to_exprs": to_exprs,
})
.to_string();
let state: RepartitionStart = serde_json::from_str(&json).unwrap();
let RepartitionFrom::Partitioned { exprs } = state.from else {
panic!("expected partition source");
};
assert_eq!(exprs, vec![range_expr("x", 0, 100)]);
}
#[test]
fn test_repartition_start_deserializes_current_from() {
let state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col1".to_string()],
},
vec![range_expr("col1", 0, 50)],
);
let json = serde_json::to_string(&state).unwrap();
let state: RepartitionStart = serde_json::from_str(&json).unwrap();
let RepartitionFrom::Unpartitioned { partition_columns } = state.from else {
panic!("expected unpartitioned source");
};
assert_eq!(partition_columns, vec!["col1"]);
}
#[tokio::test]
async fn test_partitioned_source_does_not_initialize_partition_metadata_update() {
let env = TestingEnv::new();
let table_id = 1024;
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(
RegionId::new(table_id, 1),
&range_expr("x", 0, 100).as_json_str().unwrap(),
)],
test_region_wal_options(&[1]),
)
.await;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let mut ctx = new_parent_context(&env, node_manager, table_id);
let mut state = RepartitionStart::new(
RepartitionFrom::Partitioned {
exprs: vec![range_expr("x", 0, 100)],
},
vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
);
let (next, status) = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap();
assert!(!status.need_persist());
assert!(next.as_any().is::<AllocateRegion>());
assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
}
#[tokio::test]
async fn test_unpartitioned_source_initializes_partition_metadata_update() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col2".to_string(), "col1".to_string()],
},
vec![range_expr("col2", 0, 50), range_expr("col2", 50, 100)],
);
let (next, status) = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap();
assert!(status.need_persist());
assert!(next.as_any().is::<UpdatePartitionMetadata>());
assert_eq!(
ctx.persistent_ctx
.partition_metadata_update
.as_ref()
.unwrap()
.partition_key_indices,
vec![2, 0]
);
}
#[tokio::test]
async fn test_unpartitioned_source_rejects_existing_partition_metadata() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let current = ctx.get_raw_table_info_value().await.unwrap();
let mut table_info = current.table_info.clone();
table_info.meta.partition_key_indices = vec![0];
ctx.update_table_info(&current, current.update(table_info))
.await
.unwrap();
let mut state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col1".to_string()],
},
vec![range_expr("col1", 0, 50)],
);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(err.to_string().contains("expects an unpartitioned table"));
assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
}
#[tokio::test]
async fn test_repartition_start_rejects_empty_target_partition_exprs() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state =
RepartitionStart::new(RepartitionFrom::Partitioned { exprs: vec![] }, vec![]);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(
err.to_string()
.contains("non-empty target partition expressions")
);
}
#[tokio::test]
async fn test_unpartitioned_source_rejects_empty_target_partition_exprs() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["col1".to_string()],
},
vec![],
);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(
err.to_string()
.contains("non-empty target partition expressions")
);
assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
}
#[tokio::test]
async fn test_unpartitioned_source_rejects_empty_partition_columns() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec![],
},
vec![range_expr("col1", 0, 50)],
);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(err.to_string().contains("non-empty partition columns"));
assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
}
#[tokio::test]
async fn test_unpartitioned_source_rejects_missing_partition_column() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state = RepartitionStart::new(
RepartitionFrom::Unpartitioned {
partition_columns: vec!["missing_col".to_string()],
},
vec![range_expr("col1", 0, 50)],
);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(
err.to_string()
.contains("Partition column missing_col not found")
);
assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
}
}

View File

@@ -0,0 +1,251 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_meta::lock_key::TableLock;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{self, Result};
use crate::procedure::repartition::allocate_region::AllocateRegion;
use crate::procedure::repartition::plan::AllocationPlanEntry;
use crate::procedure::repartition::{Context, State};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PartitionMetadataUpdate {
pub partition_key_indices: Vec<usize>,
}
impl PartitionMetadataUpdate {
pub fn new(partition_key_indices: Vec<usize>) -> Self {
Self {
partition_key_indices,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdatePartitionMetadata {
plan_entries: Vec<AllocationPlanEntry>,
}
impl UpdatePartitionMetadata {
pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
Self { plan_entries }
}
}
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdatePartitionMetadata {
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let Some(update) = ctx.persistent_ctx.partition_metadata_update.as_ref() else {
return Ok((
Box::new(AllocateRegion::new(self.plan_entries.clone())),
Status::executing(false),
));
};
let partition_key_indices = update.partition_key_indices.clone();
ensure!(
!partition_key_indices.is_empty(),
error::InvalidArgumentsSnafu {
err_msg:
"Repartition partition metadata update expects non-empty partition key indices"
.to_string(),
}
);
let table_id = ctx.persistent_ctx.table_id;
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let table_info_value = ctx.get_raw_table_info_value().await?;
let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices;
if current_partition_key_indices == &partition_key_indices {
return Ok((
Box::new(AllocateRegion::new(self.plan_entries.clone())),
Status::executing(true),
));
}
ensure!(
current_partition_key_indices.is_empty(),
error::InvalidArgumentsSnafu {
err_msg: format!(
"Repartition partition metadata update expects an unpartitioned table, but table {} has partition key indices: {:?}",
table_id, current_partition_key_indices
),
}
);
let mut new_table_info = table_info_value.table_info.clone();
new_table_info.meta.partition_key_indices = partition_key_indices;
ctx.update_table_info(&table_info_value, table_info_value.update(new_table_info))
.await?;
// We don't invalidate cache here because the subsequent AllocateRegion step
// will update the table route and invalidate the cache accordingly.
Ok((
Box::new(AllocateRegion::new(self.plan_entries.clone())),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::test_util::MockDatanodeManager;
use store_api::storage::{RegionId, TableId};
use super::*;
use crate::procedure::repartition::test_util::{
TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
};
async fn new_test_context(env: &TestingEnv, table_id: TableId) -> Context {
env.create_physical_table_metadata_for_repartition(
table_id,
vec![test_region_route(RegionId::new(table_id, 1), "")],
test_region_wal_options(&[1]),
)
.await;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let mut ctx = new_parent_context(env, node_manager, table_id);
ctx.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate::new(vec![0]));
ctx
}
async fn set_partition_key_indices(ctx: &Context, partition_key_indices: Vec<usize>) {
let current = ctx.get_raw_table_info_value().await.unwrap();
let mut table_info = current.table_info.clone();
table_info.meta.partition_key_indices = partition_key_indices;
ctx.update_table_info(&current, current.update(table_info))
.await
.unwrap();
}
async fn partition_key_indices(ctx: &Context) -> Vec<usize> {
ctx.get_table_info_value()
.await
.unwrap()
.table_info
.meta
.partition_key_indices
}
#[tokio::test]
async fn test_update_partition_metadata_applies_to_unpartitioned_table() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let mut state = UpdatePartitionMetadata::new(vec![]);
let (next, status) = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap();
assert!(status.need_persist());
assert!(next.as_any().is::<AllocateRegion>());
assert_eq!(partition_key_indices(&ctx).await, vec![0]);
}
#[tokio::test]
async fn test_update_partition_metadata_replay_is_noop() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
set_partition_key_indices(&ctx, vec![0]).await;
let mut state = UpdatePartitionMetadata::new(vec![]);
let (next, status) = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap();
assert!(status.need_persist());
assert!(next.as_any().is::<AllocateRegion>());
assert_eq!(partition_key_indices(&ctx).await, vec![0]);
}
#[tokio::test]
async fn test_update_partition_metadata_rejects_empty_partition_key_indices() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
ctx.persistent_ctx.partition_metadata_update = Some(PartitionMetadataUpdate::new(vec![]));
let mut state = UpdatePartitionMetadata::new(vec![]);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(err.to_string().contains("non-empty partition key indices"));
assert!(partition_key_indices(&ctx).await.is_empty());
}
#[tokio::test]
async fn test_update_partition_metadata_rejects_other_partition_keys() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
set_partition_key_indices(&ctx, vec![1]).await;
let mut state = UpdatePartitionMetadata::new(vec![]);
let err = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap_err();
assert!(err.to_string().contains("expects an unpartitioned table"));
assert_eq!(partition_key_indices(&ctx).await, vec![1]);
}
#[tokio::test]
async fn test_update_partition_metadata_preserves_plan_entries() {
let env = TestingEnv::new();
let table_id = 1024;
let mut ctx = new_test_context(&env, table_id).await;
let plan_entries = vec![crate::procedure::repartition::plan::AllocationPlanEntry {
group_id: uuid::Uuid::new_v4(),
source_regions: vec![
crate::procedure::repartition::plan::SourceRegionDescriptor::Default {
region_id: RegionId::new(table_id, 1),
},
],
target_partition_exprs: vec![range_expr("x", 0, 10)],
transition_map: vec![vec![0]],
}];
let mut state = UpdatePartitionMetadata::new(plan_entries);
let (next, _) = state
.next(&mut ctx, &TestingEnv::procedure_context())
.await
.unwrap();
assert!(next.as_any().is::<AllocateRegion>());
}
}

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::RepartitionProcedureFactory;
use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
@@ -66,7 +66,7 @@ impl RepartitionProcedureFactory for StandaloneRepartitionProcedureFactory {
_ddl_ctx: &DdlContext,
_table_name: TableName,
_table_id: TableId,
_from_exprs: Vec<String>,
_source: RepartitionSource,
_to_exprs: Vec<String>,
_timeout: Option<Duration>,
) -> std::result::Result<BoxedProcedure, BoxedError> {

View File

@@ -55,6 +55,24 @@ macro_rules! repartition_tests {
}
}
#[tokio::test(flavor = "multi_thread")]
async fn [< test_partition_unpartitioned_mito >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition::test_partition_unpartitioned_mito(store_type).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn [< test_partition_unpartitioned_metric >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition::test_partition_unpartitioned_metric(store_type).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_metric >]() {
let store_type = tests_integration::test_util::StorageType::$service;
@@ -78,6 +96,274 @@ macro_rules! repartition_tests {
};
}
pub async fn test_partition_unpartitioned_mito(store_type: StorageType) {
info!(
"test_partition_unpartitioned_mito: store_type: {:?}",
store_type
);
let cluster_name = "test_partition_unpartitioned_mito";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_partition_unpartitioned_mito_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.build(true)
.await;
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
let sql = r#"
CREATE TABLE `partition_unpartitioned_mito_table`(
`id` INT,
`city` STRING,
`ts` TIMESTAMP TIME INDEX,
PRIMARY KEY(`id`, `city`)
) ENGINE = mito;
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `partition_unpartitioned_mito_table` VALUES
(1, 'New York', '2022-01-01 00:00:00'),
(10, 'Paris', '2022-01-01 00:00:00'),
(20, 'Beijing', '2022-01-01 00:00:00');
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
ALTER TABLE `partition_unpartitioned_mito_table` PARTITION ON COLUMNS (`id`) (
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation.
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
"SELECT * FROM `partition_unpartitioned_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
let result = run_sql(
instance,
"\
SELECT partition_expression, partition_description \
FROM information_schema.partitions \
WHERE table_name = 'partition_unpartitioned_mito_table' \
ORDER BY partition_ordinal_position;",
query_ctx.clone(),
)
.await
.unwrap();
let expected_partitions = r#"+----------------------+-----------------------+
| partition_expression | partition_description |
+----------------------+-----------------------+
| id | id < 10 |
| id | id >= 10 AND id < 20 |
| id | id >= 20 |
+----------------------+-----------------------+"#;
check_output_stream(result.data, expected_partitions).await;
let sql = r#"
INSERT INTO `partition_unpartitioned_mito_table` VALUES
(5, 'London', '2022-01-02 00:00:00'),
(15, 'Tokyo', '2022-01-02 00:00:00'),
(25, 'Shanghai', '2022-01-02 00:00:00');
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `partition_unpartitioned_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 5 | London | 2022-01-02T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-02T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-02T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
run_sql(
instance,
"DROP TABLE `partition_unpartitioned_mito_table`",
query_ctx.clone(),
)
.await
.unwrap();
}
pub async fn test_partition_unpartitioned_metric(store_type: StorageType) {
info!(
"test_partition_unpartitioned_metric: store_type: {:?}",
store_type
);
let cluster_name = "test_partition_unpartitioned_metric";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_partition_unpartitioned_metric_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.build(true)
.await;
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
let sql = r#"
CREATE TABLE `partition_unpartitioned_metric_phy`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) ENGINE = metric
WITH (
"physical_metric_table" = "true"
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
CREATE TABLE `partition_unpartitioned_metric_log`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) ENGINE = metric
WITH (
"on_physical_table" = "partition_unpartitioned_metric_phy"
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `partition_unpartitioned_metric_log` (`host`, `ts`, `val`) VALUES
('a_host', '2022-01-01 00:00:00', 1),
('z_host', '2022-01-01 00:00:00', 2);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
ALTER TABLE `partition_unpartitioned_metric_phy` PARTITION ON COLUMNS (`host`) (
`host` < 'm',
`host` >= 'm'
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation.
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
"SELECT * FROM `partition_unpartitioned_metric_log` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
let result = run_sql(
instance,
"\
SELECT partition_expression, partition_description \
FROM information_schema.partitions \
WHERE table_name = 'partition_unpartitioned_metric_phy' \
ORDER BY partition_ordinal_position;",
query_ctx.clone(),
)
.await
.unwrap();
let expected_partitions = r#"+----------------------+-----------------------+
| partition_expression | partition_description |
+----------------------+-----------------------+
| host | host < m |
| host | host >= m |
+----------------------+-----------------------+"#;
check_output_stream(result.data, expected_partitions).await;
let sql = r#"
INSERT INTO `partition_unpartitioned_metric_log` (`host`, `ts`, `val`) VALUES
('b_host', '2022-01-02 00:00:00', 3),
('x_host', '2022-01-02 00:00:00', 4);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `partition_unpartitioned_metric_log` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| b_host | 2022-01-02T00:00:00 | 3.0 |
| x_host | 2022-01-02T00:00:00 | 4.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
run_sql(
instance,
"DROP TABLE `partition_unpartitioned_metric_log`",
query_ctx.clone(),
)
.await
.unwrap();
run_sql(
instance,
"DROP TABLE `partition_unpartitioned_metric_phy`",
query_ctx.clone(),
)
.await
.unwrap();
}
async fn trigger_table_gc(metasrv: &Arc<Metasrv>, table_name: &str) {
info!("triggering table gc for table: {}", table_name);
let table_metadata_manager = metasrv.table_metadata_manager();