Compare commits

...

3 Commits

Author SHA1 Message Date
jeremyhi
0c373062c2 feat: make grpc can handle metric engine request (#7508)
Signed-off-by: jeremyhi <fengjiachun@gmail.com>
2026-01-09 09:42:27 +00:00
Weny Xu
567d3e66e9 feat: integrate repartition procedure into DdlManager (#7548)
* feat: add repartition procedure factory support to DdlManager

- Introduce RepartitionProcedureFactory trait for creating and registering
  repartition procedures
- Implement DefaultRepartitionProcedureFactory for metasrv with full support
- Implement StandaloneRepartitionProcedureFactory for standalone (unsupported)
- Add procedure loader registration for RepartitionProcedure and
  RepartitionGroupProcedure
- Add helper methods to TableMetadataAllocator for allocator access
- Add error types for repartition procedure operations
- Update DdlManager to accept and use RepartitionProcedureFactoryRef

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: integrate repartition procedure into DdlManager

- Add submit_repartition_task() to handle repartition from alter table
- Route Repartition operations in submit_alter_table_task() to repartition factory
- Refactor: rename submit_procedure() to execute_procedure_and_wait()
- Make all DDL operations wait for completion by default
- Add submit_procedure() for fire-and-forget submissions
- Add CreateRepartitionProcedure error type
- Add placeholder Repartition handling in grpc-expr (unsupported)
- Update greptime-proto dependency

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: implement ALTER TABLE REPARTITION procedure submission

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(repartition): handle central region in apply staging manifest

- Introduce ApplyStagingManifestInstructions struct to organize instructions
- Add special handling for central region when applying staging manifests
- Transition state from UpdateMetadata to RepartitionEnd after applying staging manifests
- Remove next_state() method in RepartitionStart and inline state transitions
- Improve logging and expression serialization in DDL statement executor
- Move repartition tests from standalone to distributed test suite

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2026-01-09 08:37:21 +00:00
discord9
63284a5081 chore: sqlness fmt (#7551)
chore

Signed-off-by: discord9 <discord9@163.com>
2026-01-09 07:18:23 +00:00
28 changed files with 873 additions and 188 deletions

3
Cargo.lock generated
View File

@@ -5467,7 +5467,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=58aeee49267fb1eafa6f9123f9d0c47dd0f62722#58aeee49267fb1eafa6f9123f9d0c47dd0f62722"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -12512,6 +12512,7 @@ dependencies = [
"servers",
"snafu 0.8.6",
"store-api",
"table",
"tokio",
]

View File

@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -64,8 +64,8 @@ use plugins::frontend::context::{
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
@@ -509,8 +509,13 @@ impl StartCommand {
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
};
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = DdlManager::try_new(
ddl_context,
procedure_manager.clone(),
Arc::new(StandaloneRepartitionProcedureFactory),
true,
)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = if let Some(configurator) =
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()

View File

@@ -34,7 +34,7 @@ use table::requests::{
};
use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
@@ -251,6 +251,10 @@ pub fn alter_expr_to_request(
.collect::<Result<Vec<_>>>()?;
AlterKind::SetDefaults { defaults }
}
Kind::Repartition(_) => error::UnexpectedSnafu {
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
}
.fail()?,
};
let request = AlterTableRequest {

View File

@@ -161,6 +161,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected {
err_msg: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -188,6 +195,7 @@ impl ErrorExt for Error {
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::SqlCommon { source, .. } => source.status_code(),
Error::MissingTableMeta { .. } => StatusCode::Unexpected,
Error::Unexpected { .. } => StatusCode::Unexpected,
}
}

View File

@@ -22,7 +22,7 @@ use snafu::OptionExt;
use table::metadata::RawTableInfo;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::error::{InvalidProtoMsgSnafu, Result};
use crate::error::{self, InvalidProtoMsgSnafu, Result};
impl AlterTableProcedure {
/// Makes alter kind proto that all regions can reuse.
@@ -112,6 +112,10 @@ fn create_proto_alter_kind(
Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
Kind::Repartition(_) => error::UnexpectedSnafu {
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
}
.fail()?,
}
}

View File

@@ -161,7 +161,18 @@ impl TableMetadataAllocator {
})
}
/// Returns the table id allocator.
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
self.table_id_allocator.clone()
}
/// Returns the wal options allocator.
pub fn wal_options_allocator(&self) -> WalOptionsAllocatorRef {
self.wal_options_allocator.clone()
}
/// Returns the region routes allocator.
pub fn region_routes_allocator(&self) -> RegionRoutesAllocatorRef {
self.region_routes_allocator.clone()
}
}

View File

@@ -14,15 +14,19 @@
use std::sync::Arc;
use api::v1::Repartition;
use api::v1::alter_table_expr::Kind;
use common_error::ext::BoxedError;
use common_procedure::{
BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
ProcedureWithId, watcher,
};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, info, tracing};
use derive_builder::Builder;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
@@ -40,7 +44,8 @@ use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, utils};
use crate::error::{
EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
};
@@ -90,6 +95,7 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
pub struct DdlManager {
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
repartition_procedure_factory: RepartitionProcedureFactoryRef,
#[cfg(feature = "enterprise")]
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
}
@@ -143,16 +149,37 @@ macro_rules! procedure_loader {
};
}
pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
pub trait RepartitionProcedureFactory: Send + Sync {
fn create(
&self,
ddl_ctx: &DdlContext,
table_name: TableName,
table_id: TableId,
from_exprs: Vec<String>,
to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError>;
fn register_loaders(
&self,
ddl_ctx: &DdlContext,
procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError>;
}
impl DdlManager {
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
ddl_context: DdlContext,
procedure_manager: ProcedureManagerRef,
repartition_procedure_factory: RepartitionProcedureFactoryRef,
register_loaders: bool,
) -> Result<Self> {
let manager = Self {
ddl_context,
procedure_manager,
repartition_procedure_factory,
#[cfg(feature = "enterprise")]
trigger_ddl_manager: None,
};
@@ -204,9 +231,63 @@ impl DdlManager {
.context(RegisterProcedureLoaderSnafu { type_name })?;
}
self.repartition_procedure_factory
.register_loaders(&self.ddl_context, &self.procedure_manager)
.context(RegisterRepartitionProcedureLoaderSnafu)?;
Ok(())
}
/// Submits a repartition procedure for the specified table.
///
/// This creates a repartition procedure using the provided `table_id`,
/// `table_name`, and `Repartition` configuration, and then either executes it
/// to completion or just submits it for asynchronous execution.
///
/// The `Repartition` argument contains the original (`from_partition_exprs`)
/// and target (`into_partition_exprs`) partition expressions that define how
/// the table should be repartitioned.
///
/// The `wait` flag controls whether this method waits for the repartition
/// procedure to finish:
/// - If `wait` is `true`, the procedure is executed and this method awaits
/// its completion, returning both the generated `ProcedureId` and the
/// final `Output` of the procedure.
/// - If `wait` is `false`, the procedure is only submitted to the procedure
/// manager for asynchronous execution, and this method returns the
/// `ProcedureId` along with `None` as the output.
async fn submit_repartition_task(
&self,
table_id: TableId,
table_name: TableName,
Repartition {
from_partition_exprs,
into_partition_exprs,
wait,
}: Repartition,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = self
.repartition_procedure_factory
.create(
&context,
table_name,
table_id,
from_partition_exprs,
into_partition_exprs,
)
.context(CreateRepartitionProcedureSnafu)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
if wait {
self.execute_procedure_and_wait(procedure_with_id).await
} else {
self.submit_procedure(procedure_with_id)
.await
.map(|p| (p, None))
}
}
/// Submits and executes an alter table task.
#[tracing::instrument(skip_all)]
pub async fn submit_alter_table_task(
@@ -214,13 +295,28 @@ impl DdlManager {
table_id: TableId,
alter_table_task: AlterTableTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
// make alter_table_task mutable so we can call .take() on its field
let mut alter_table_task = alter_table_task;
if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
&& let Kind::Repartition(repartition) =
alter_table_task.alter_table.kind.take().unwrap()
{
let table_name = TableName::new(
alter_table_task.alter_table.catalog_name,
alter_table_task.alter_table.schema_name,
alter_table_task.alter_table.table_name,
);
return self
.submit_repartition_task(table_id, table_name, repartition)
.await;
}
let context = self.create_context();
let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create table task.
@@ -235,7 +331,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a `[CreateViewTask]`.
@@ -250,7 +346,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create multiple logical table tasks.
@@ -267,7 +363,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes alter multiple table tasks.
@@ -284,7 +380,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop table task.
@@ -299,7 +395,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create database task.
@@ -318,7 +414,7 @@ impl DdlManager {
CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop table task.
@@ -335,7 +431,7 @@ impl DdlManager {
let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
pub async fn submit_alter_database(
@@ -346,7 +442,7 @@ impl DdlManager {
let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a create flow task.
@@ -360,7 +456,7 @@ impl DdlManager {
let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop flow task.
@@ -373,7 +469,7 @@ impl DdlManager {
let procedure = DropFlowProcedure::new(drop_flow, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop view task.
@@ -386,7 +482,7 @@ impl DdlManager {
let procedure = DropViewProcedure::new(drop_view, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a truncate table task.
@@ -407,7 +503,7 @@ impl DdlManager {
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a comment on task.
@@ -420,10 +516,11 @@ impl DdlManager {
let procedure = CommentOnProcedure::new(comment_on_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
self.execute_procedure_and_wait(procedure_with_id).await
}
async fn submit_procedure(
/// Executes a procedure and waits for the result.
async fn execute_procedure_and_wait(
&self,
procedure_with_id: ProcedureWithId,
) -> Result<(ProcedureId, Option<Output>)> {
@@ -442,6 +539,18 @@ impl DdlManager {
Ok((procedure_id, output))
}
/// Submits a procedure and returns the procedure id.
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;
let _ = self
.procedure_manager
.submit(procedure_with_id)
.await
.context(SubmitProcedureSnafu)?;
Ok(procedure_id)
}
pub async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
@@ -947,8 +1056,12 @@ async fn handle_comment_on_task(
mod tests {
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_procedure::local::LocalManager;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{BoxedProcedure, ProcedureManagerRef};
use store_api::storage::TableId;
use table::table_name::TableName;
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
@@ -959,6 +1072,7 @@ mod tests {
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use crate::ddl_manager::RepartitionProcedureFactory;
use crate::key::TableMetadataManager;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -987,6 +1101,30 @@ mod tests {
}
}
struct DummyRepartitionProcedureFactory;
#[async_trait::async_trait]
impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
fn create(
&self,
_ddl_ctx: &DdlContext,
_table_name: TableName,
_table_id: TableId,
_from_exprs: Vec<String>,
_to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError> {
unimplemented!()
}
fn register_loaders(
&self,
_ddl_ctx: &DdlContext,
_procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError> {
Ok(())
}
}
#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
@@ -1023,6 +1161,7 @@ mod tests {
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
Arc::new(DummyRepartitionProcedureFactory),
true,
);

View File

@@ -104,6 +104,20 @@ pub enum Error {
source: common_procedure::error::Error,
},
#[snafu(display("Failed to register repartition procedure loader"))]
RegisterRepartitionProcedureLoader {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to create repartition procedure"))]
CreateRepartitionProcedure {
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to submit procedure"))]
SubmitProcedure {
#[snafu(implicit)]
@@ -1170,6 +1184,8 @@ impl ErrorExt for Error {
PutPoison { source, .. } => source.status_code(),
ConvertColumnDef { source, .. } => source.status_code(),
ProcedureStateReceiver { source, .. } => source.status_code(),
RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
CreateRepartitionProcedure { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }

View File

@@ -37,6 +37,7 @@ use datafusion::datasource::DefaultTableSource;
use futures::Stream;
use futures::stream::StreamExt;
use query::parser::PromQuery;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
@@ -73,10 +74,20 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false, false)
Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
Some(physical_table) => {
self.handle_metric_row_inserts(
requests,
ctx.clone(),
physical_table.to_string(),
)
.await?
}
}
None => {
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
},
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {

View File

@@ -1038,7 +1038,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to serialize partition expression: {}", source))]
#[snafu(display("Failed to serialize partition expression"))]
SerializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
@@ -1046,6 +1046,20 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to deserialize partition expression"))]
DeserializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty partition expression"))]
EmptyPartitionExpr {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Partition expression mismatch, region id: {}, expected: {}, actual: {}",
region_id,
@@ -1163,7 +1177,8 @@ impl ErrorExt for Error {
| Error::MailboxChannelClosed { .. }
| Error::IsNotLeader { .. } => StatusCode::IllegalState,
Error::RetryLaterWithSource { source, .. } => source.status_code(),
Error::SerializePartitionExpr { source, .. } => source.status_code(),
Error::SerializePartitionExpr { source, .. }
| Error::DeserializePartitionExpr { source, .. } => source.status_code(),
Error::Unsupported { .. } => StatusCode::Unsupported,
@@ -1189,7 +1204,8 @@ impl ErrorExt for Error {
| Error::RepartitionSourceRegionMissing { .. }
| Error::RepartitionTargetRegionMissing { .. }
| Error::PartitionExprMismatch { .. }
| Error::RepartitionSourceExprMismatch { .. } => StatusCode::InvalidArguments,
| Error::RepartitionSourceExprMismatch { .. }
| Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }

View File

@@ -70,6 +70,7 @@ use crate::metasrv::{
use crate::peer::MetasrvPeerAllocator;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
use crate::procedure::wal_prune::Context as WalPruneContext;
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
use crate::region::flush_trigger::RegionFlushTrigger;
@@ -400,8 +401,17 @@ impl MetasrvBuilder {
region_failure_detector_controller,
};
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
mailbox.clone(),
options.grpc.server_addr.clone(),
));
let ddl_manager = DdlManager::try_new(
ddl_context,
procedure_manager_c,
repartition_procedure_factory,
true,
)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = if let Some(configurator) = plugins
.as_ref()

View File

@@ -27,24 +27,38 @@ use std::fmt::Debug;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlContext;
use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use common_meta::ddl_manager::RepartitionProcedureFactory;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::RegionInfo;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use common_meta::node_manager::NodeManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
};
use common_telemetry::error;
use partition::expr::PartitionExpr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::error::{self, Result};
use crate::procedure::repartition::group::{
Context as RepartitionGroupContext, RepartitionGroupProcedure,
};
use crate::procedure::repartition::plan::RepartitionPlanEntry;
use crate::procedure::repartition::repartition_start::RepartitionStart;
use crate::procedure::repartition::utils::get_datanode_table_value;
use crate::service::mailbox::MailboxRef;
@@ -60,6 +74,35 @@ pub struct PersistentContext {
pub plans: Vec<RepartitionPlanEntry>,
}
impl PersistentContext {
pub fn new(
TableName {
catalog_name,
schema_name,
table_name,
}: TableName,
table_id: TableId,
) -> Self {
Self {
catalog_name,
schema_name,
table_name,
table_id,
plans: vec![],
}
}
pub fn lock_key(&self) -> Vec<StringKey> {
vec![
CatalogLock::Read(&self.catalog_name).into(),
SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
TableLock::Write(self.table_id).into(),
TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
]
}
}
#[derive(Clone)]
pub struct Context {
pub persistent_ctx: PersistentContext,
pub table_metadata_manager: TableMetadataManagerRef,
@@ -74,6 +117,26 @@ pub struct Context {
}
impl Context {
pub fn new(
ddl_ctx: &DdlContext,
mailbox: MailboxRef,
server_addr: String,
persistent_ctx: PersistentContext,
) -> Self {
Self {
persistent_ctx,
table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
node_manager: ddl_ctx.node_manager.clone(),
leader_region_registry: ddl_ctx.leader_region_registry.clone(),
mailbox,
server_addr,
cache_invalidator: ddl_ctx.cache_invalidator.clone(),
region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
}
}
/// Retrieves the table route value for the given table id.
///
/// Retry:
@@ -207,3 +270,202 @@ pub(crate) trait State: Sync + Send + Debug {
fn as_any(&self) -> &dyn Any;
}
pub struct RepartitionProcedure {
state: Box<dyn State>,
context: Context,
}
#[derive(Debug, Serialize)]
struct RepartitionData<'a> {
state: &'a dyn State,
persistent_ctx: &'a PersistentContext,
}
#[derive(Debug, Deserialize)]
struct RepartitionDataOwned {
state: Box<dyn State>,
persistent_ctx: PersistentContext,
}
impl RepartitionProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
pub fn new(
from_exprs: Vec<PartitionExpr>,
to_exprs: Vec<PartitionExpr>,
context: Context,
) -> Self {
let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
Self { state, context }
}
pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
where
F: FnOnce(PersistentContext) -> Context,
{
let RepartitionDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ctx_factory(persistent_ctx);
Ok(Self { state, context })
}
}
#[async_trait::async_trait]
impl Procedure for RepartitionProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
match state.next(&mut self.context, _ctx).await {
Ok((next, status)) => {
*state = next;
Ok(status)
}
Err(e) => {
if e.is_retryable() {
Err(ProcedureError::retry_later(e))
} else {
error!(
e;
"Repartition procedure failed, table id: {}",
self.context.persistent_ctx.table_id,
);
Err(ProcedureError::external(e))
}
}
}
}
fn rollback_supported(&self) -> bool {
// TODO(weny): support rollback.
false
}
fn dump(&self) -> ProcedureResult<String> {
let data = RepartitionData {
state: self.state.as_ref(),
persistent_ctx: &self.context.persistent_ctx,
};
serde_json::to_string(&data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(self.context.persistent_ctx.lock_key())
}
fn user_metadata(&self) -> Option<UserMetadata> {
// TODO(weny): support user metadata.
None
}
}
pub struct DefaultRepartitionProcedureFactory {
mailbox: MailboxRef,
server_addr: String,
}
impl DefaultRepartitionProcedureFactory {
pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
Self {
mailbox,
server_addr,
}
}
}
impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
fn create(
&self,
ddl_ctx: &DdlContext,
table_name: TableName,
table_id: TableId,
from_exprs: Vec<String>,
to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError> {
let persistent_ctx = PersistentContext::new(table_name, table_id);
let from_exprs = from_exprs
.iter()
.map(|e| {
PartitionExpr::from_json_str(e)
.context(error::DeserializePartitionExprSnafu)?
.context(error::EmptyPartitionExprSnafu)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)?;
let to_exprs = to_exprs
.iter()
.map(|e| {
PartitionExpr::from_json_str(e)
.context(error::DeserializePartitionExprSnafu)?
.context(error::EmptyPartitionExprSnafu)
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)?;
let procedure = RepartitionProcedure::new(
from_exprs,
to_exprs,
Context::new(
ddl_ctx,
self.mailbox.clone(),
self.server_addr.clone(),
persistent_ctx,
),
);
Ok(Box::new(procedure))
}
fn register_loaders(
&self,
ddl_ctx: &DdlContext,
procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError> {
// Registers the repartition procedure loader.
let mailbox = self.mailbox.clone();
let server_addr = self.server_addr.clone();
let moved_ddl_ctx = ddl_ctx.clone();
procedure_manager
.register_loader(
RepartitionProcedure::TYPE_NAME,
Box::new(move |json| {
let mailbox = mailbox.clone();
let server_addr = server_addr.clone();
let ddl_ctx = moved_ddl_ctx.clone();
let factory = move |persistent_ctx| {
Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
};
RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
}),
)
.map_err(BoxedError::new)?;
// Registers the repartition group procedure loader.
let mailbox = self.mailbox.clone();
let server_addr = self.server_addr.clone();
let moved_ddl_ctx = ddl_ctx.clone();
procedure_manager
.register_loader(
RepartitionGroupProcedure::TYPE_NAME,
Box::new(move |json| {
let mailbox = mailbox.clone();
let server_addr = server_addr.clone();
let ddl_ctx = moved_ddl_ctx.clone();
let factory = move |persistent_ctx| {
RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
};
RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
}),
)
.map_err(BoxedError::new)?;
Ok(())
}
}

View File

@@ -27,6 +27,7 @@ use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
use common_meta::key::table_route::TableRouteValue;
@@ -34,7 +35,7 @@ use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::error::ToJsonSnafu;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status, StringKey, UserMetadata,
@@ -60,8 +61,20 @@ pub struct RepartitionGroupProcedure {
context: Context,
}
#[derive(Debug, Serialize)]
struct RepartitionGroupData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
}
#[derive(Debug, Deserialize)]
struct RepartitionGroupDataOwned {
persistent_ctx: PersistentContext,
state: Box<dyn State>,
}
impl RepartitionGroupProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
let state = Box::new(RepartitionStart);
@@ -77,12 +90,19 @@ impl RepartitionGroupProcedure {
},
}
}
}
#[derive(Debug, Serialize)]
pub struct RepartitionGroupData<'a> {
persistent_ctx: &'a PersistentContext,
state: &'a dyn State,
pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
where
F: FnOnce(PersistentContext) -> Context,
{
let RepartitionGroupDataOwned {
state,
persistent_ctx,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let context = ctx_factory(persistent_ctx);
Ok(Self { state, context })
}
}
#[async_trait::async_trait]
@@ -149,6 +169,23 @@ pub struct Context {
pub server_addr: String,
}
impl Context {
pub fn new(
ddl_ctx: &DdlContext,
mailbox: MailboxRef,
server_addr: String,
persistent_ctx: PersistentContext,
) -> Self {
Self {
persistent_ctx,
cache_invalidator: ddl_ctx.cache_invalidator.clone(),
table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
mailbox,
server_addr,
}
}
}
/// The result of the group preparation phase, containing validated region routes.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GroupPrepareResult {

View File

@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
use crate::error::{self, Error, Result};
use crate::handler::HeartbeatMailbox;
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
use crate::procedure::repartition::group::utils::{
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
};
@@ -52,10 +52,7 @@ impl State for ApplyStagingManifest {
) -> Result<(Box<dyn State>, Status)> {
self.apply_staging_manifests(ctx).await?;
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
Ok((Box::new(RepartitionEnd), Status::executing(true)))
}
fn as_any(&self) -> &dyn Any {
@@ -63,13 +60,18 @@ impl State for ApplyStagingManifest {
}
}
struct ApplyStagingManifestInstructions {
instructions: HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>,
central_region_instruction: Option<(Peer, common_meta::instruction::ApplyStagingManifest)>,
}
impl ApplyStagingManifest {
fn build_apply_staging_manifest_instructions(
staging_manifest_paths: &HashMap<RegionId, String>,
target_routes: &[RegionRoute],
targets: &[RegionDescriptor],
central_region_id: RegionId,
) -> Result<HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>> {
) -> Result<ApplyStagingManifestInstructions> {
let target_partition_expr_by_region = targets
.iter()
.map(|target| {
@@ -86,7 +88,25 @@ impl ApplyStagingManifest {
let target_region_routes_by_peer = group_region_routes_by_peer(target_routes);
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
for (peer, region_ids) in target_region_routes_by_peer {
let mut central_region_instruction = None;
for (peer, mut region_ids) in target_region_routes_by_peer {
// If the central region is in the target region ids,
// remove it and build the instruction for the central region.
if region_ids.contains(&central_region_id) {
region_ids.retain(|r| *r != central_region_id);
central_region_instruction = Some((
peer.clone(),
common_meta::instruction::ApplyStagingManifest {
region_id: central_region_id,
partition_expr: target_partition_expr_by_region[&central_region_id].clone(),
central_region_id,
manifest_path: staging_manifest_paths[&central_region_id].clone(),
},
));
if region_ids.is_empty() {
continue;
}
}
let apply_staging_manifests = region_ids
.into_iter()
.map(|region_id| common_meta::instruction::ApplyStagingManifest {
@@ -99,7 +119,10 @@ impl ApplyStagingManifest {
instructions.insert(peer.clone(), apply_staging_manifests);
}
Ok(instructions)
Ok(ApplyStagingManifestInstructions {
instructions,
central_region_instruction,
})
}
#[allow(dead_code)]
@@ -112,7 +135,10 @@ impl ApplyStagingManifest {
let targets = &ctx.persistent_ctx.targets;
let target_routes = &prepare_result.target_routes;
let central_region_id = prepare_result.central_region;
let instructions = Self::build_apply_staging_manifest_instructions(
let ApplyStagingManifestInstructions {
instructions,
central_region_instruction,
} = Self::build_apply_staging_manifest_instructions(
staging_manifest_paths,
target_routes,
targets,
@@ -155,8 +181,10 @@ impl ApplyStagingManifest {
let results = join_all(tasks).await;
let result = handle_multiple_results(&results);
match result {
HandleMultipleResult::AllSuccessful => Ok(()),
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
HandleMultipleResult::AllSuccessful => {
// Coninute
}
HandleMultipleResult::AllRetryable(retryable_errors) => return error::RetryLaterSnafu {
reason: format!(
"All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
table_id, group_id,
@@ -168,7 +196,7 @@ impl ApplyStagingManifest {
),
}
.fail(),
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => return error::UnexpectedSnafu {
violated: format!(
"All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
table_id, group_id,
@@ -183,7 +211,7 @@ impl ApplyStagingManifest {
HandleMultipleResult::PartialRetryable {
retryable_errors,
non_retryable_errors,
} => error::UnexpectedSnafu {
} => return error::UnexpectedSnafu {
violated: format!(
"Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}",
table_id, group_id,
@@ -201,6 +229,20 @@ impl ApplyStagingManifest {
}
.fail(),
}
if let Some((peer, instruction)) = central_region_instruction {
info!("Applying staging manifest for central region: {:?}", peer);
Self::apply_staging_manifest(
&ctx.mailbox,
&ctx.server_addr,
&peer,
&[instruction],
operation_timeout,
)
.await?;
}
Ok(())
}
async fn apply_staging_manifest(

View File

@@ -128,14 +128,6 @@ impl RepartitionStart {
central_region_datanode,
})
}
#[allow(dead_code)]
fn next_state() -> (Box<dyn State>, Status) {
(
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
)
}
}
#[async_trait::async_trait]
@@ -157,7 +149,10 @@ impl State for RepartitionStart {
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if ctx.persistent_ctx.group_prepare_result.is_some() {
return Ok(Self::next_state());
return Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
));
}
let table_id = ctx.persistent_ctx.table_id;
let group_id = ctx.persistent_ctx.group_id;
@@ -177,7 +172,10 @@ impl State for RepartitionStart {
ctx.persistent_ctx.targets.len()
);
Ok(Self::next_state())
Ok((
Box::new(UpdateMetadata::ApplyStaging),
Status::executing(true),
))
}
fn as_any(&self) -> &dyn Any {

View File

@@ -736,7 +736,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to deserialize partition expression: {}", source))]
#[snafu(display("Failed to serialize partition expression"))]
SerializePartitionExpr {
#[snafu(implicit)]
location: Location,
source: partition::error::Error,
},
#[snafu(display("Failed to deserialize partition expression"))]
DeserializePartitionExpr {
#[snafu(source)]
source: partition::error::Error,
@@ -983,7 +990,8 @@ impl ErrorExt for Error {
| Error::MissingInsertBody { .. } => StatusCode::Internal,
Error::ExecuteAdminFunction { .. }
| Error::EncodeJson { .. }
| Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected,
| Error::DeserializePartitionExpr { .. }
| Error::SerializePartitionExpr { .. } => StatusCode::Unexpected,
Error::ViewNotFound { .. }
| Error::ViewInfoNotFound { .. }
| Error::TableNotFound { .. } => StatusCode::TableNotFound,

View File

@@ -16,9 +16,11 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_table_expr::Kind;
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
Repartition, column_def,
};
#[cfg(feature = "enterprise")]
use api::v1::{
@@ -88,8 +90,9 @@ use crate::error::{
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
ViewAlreadyExistsSnafu,
};
use crate::expr_helper::{self, RepartitionRequest};
use crate::statement::StatementExecutor;
@@ -1408,20 +1411,56 @@ impl StatementExecutor {
.context(InvalidPartitionSnafu)?;
info!(
"Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}",
"Submitting repartition task for table {} (table_id={}), from {} to {} partitions",
table_ref,
table_id,
from_partition_exprs,
into_partition_exprs,
from_partition_exprs.len(),
new_partition_exprs_len
);
// TODO(weny): Implement the repartition procedure submission.
// The repartition procedure infrastructure is not yet fully integrated with the DDL task system.
NotSupportedSnafu {
feat: "ALTER TABLE REPARTITION",
}
.fail()
let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
let mut json_exprs = Vec::with_capacity(exprs.len());
for expr in exprs {
json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
}
Ok(json_exprs)
};
let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
let req = SubmitDdlTaskRequest {
query_context: query_context.clone(),
task: DdlTask::new_alter_table(AlterTableExpr {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
table_name: request.table_name.clone(),
kind: Some(Kind::Repartition(Repartition {
from_partition_exprs: from_partition_exprs_json,
into_partition_exprs: into_partition_exprs_json,
// TODO(weny): allow passing 'wait' from SQL options or QueryContext
wait: true,
})),
}),
};
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(
request.catalog_name,
request.schema_name,
request.table_name,
)),
];
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &invalidate_keys)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
#[tracing::instrument(skip_all)]

View File

@@ -37,4 +37,5 @@ serde.workspace = true
servers.workspace = true
snafu.workspace = true
store-api.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -36,6 +36,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Repartition procedure is not supported in standalone mode"))]
NoSupportRepartitionProcedure {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -45,6 +51,7 @@ impl ErrorExt for Error {
match self {
Error::OpenMetadataKvBackend { source, .. } => source.status_code(),
Error::External { source, .. } => source.status_code(),
Error::NoSupportRepartitionProcedure { .. } => StatusCode::Unsupported,
}
}

View File

@@ -20,4 +20,4 @@ pub mod procedure;
pub use information_extension::StandaloneInformationExtension;
pub use metadata::build_metadata_kvbackend;
pub use procedure::build_procedure_manager;
pub use procedure::{StandaloneRepartitionProcedureFactory, build_procedure_manager};

View File

@@ -14,12 +14,19 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_meta::ddl::DdlContext;
use common_meta::ddl_manager::RepartitionProcedureFactory;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::state_store::KvStateStore;
use common_procedure::ProcedureManagerRef;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::{BoxedProcedure, ProcedureManagerRef};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::error::NoSupportRepartitionProcedureSnafu;
/// Builds the procedure manager.
pub fn build_procedure_manager(
@@ -43,3 +50,32 @@ pub fn build_procedure_manager(
None,
))
}
/// No-op implementation of [`RepartitionProcedureFactory`] for standalone mode.
///
/// In standalone deployments, repartition operations are not supported, so
/// this factory always returns a `NoSupportRepartitionProcedure` error
/// from [`RepartitionProcedureFactory::create`] and performs no registration
/// work in [`RepartitionProcedureFactory::register_loaders`].
pub struct StandaloneRepartitionProcedureFactory;
impl RepartitionProcedureFactory for StandaloneRepartitionProcedureFactory {
fn create(
&self,
_ddl_ctx: &DdlContext,
_table_name: TableName,
_table_id: TableId,
_from_exprs: Vec<String>,
_to_exprs: Vec<String>,
) -> std::result::Result<BoxedProcedure, BoxedError> {
Err(BoxedError::new(NoSupportRepartitionProcedureSnafu.build()))
}
fn register_loaders(
&self,
_ddl_ctx: &DdlContext,
_procedure_manager: &ProcedureManagerRef,
) -> std::result::Result<(), BoxedError> {
Ok(())
}
}

View File

@@ -52,6 +52,7 @@ use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use servers::grpc::GrpcOptions;
use snafu::ResultExt;
use standalone::StandaloneRepartitionProcedureFactory;
use standalone::options::StandaloneOptions;
use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts};
@@ -215,6 +216,7 @@ impl GreptimeDbStandaloneBuilder {
flow_id_sequence,
));
let repartition_procedure_factory = Arc::new(StandaloneRepartitionProcedureFactory);
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
@@ -229,6 +231,7 @@ impl GreptimeDbStandaloneBuilder {
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
repartition_procedure_factory,
register_procedure_loaders,
)
.unwrap(),

View File

@@ -0,0 +1,101 @@
CREATE TABLE alter_repartition_table(
device_id INT,
area STRING,
ty STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id, area) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
Affected Rows: 0
ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100
) INTO (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);
Affected Rows: 0
SHOW CREATE TABLE alter_repartition_table;
+-------------------------+--------------------------------------------------------+
| Table | Create Table |
+-------------------------+--------------------------------------------------------+
| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( |
| | "device_id" INT NULL, |
| | "area" STRING NULL, |
| | "ty" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("device_id") |
| | ) |
| | PARTITION ON COLUMNS ("device_id", "area") ( |
| | device_id < 100 AND area < 'South', |
| | device_id >= 100 AND device_id < 200, |
| | device_id >= 200, |
| | device_id < 100 AND area >= 'South' |
| | ) |
| | ENGINE=mito |
| | |
+-------------------------+--------------------------------------------------------+
ALTER TABLE alter_repartition_table MERGE PARTITION (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);
Affected Rows: 0
SHOW CREATE TABLE alter_repartition_table;
+-------------------------+------------------------------------------------------------------------------+
| Table | Create Table |
+-------------------------+------------------------------------------------------------------------------+
| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( |
| | "device_id" INT NULL, |
| | "area" STRING NULL, |
| | "ty" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("device_id") |
| | ) |
| | PARTITION ON COLUMNS ("device_id", "area") ( |
| | device_id < 100 AND area < 'South' OR device_id < 100 AND area >= 'South', |
| | device_id >= 100 AND device_id < 200, |
| | device_id >= 200 |
| | ) |
| | ENGINE=mito |
| | |
+-------------------------+------------------------------------------------------------------------------+
-- FIXME(weny): Object store is not configured for the test environment,
-- so staging manifest may not be applied in some cases.
-- invalid: empty source clause
ALTER TABLE alter_repartition_table REPARTITION () INTO (
device_id < 100
);
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: )
-- invalid: more than one INTO clause
ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100
) INTO (
device_id < 50
), (
device_id >= 50
) INTO (
device_id >= 50
);
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: ,
DROP TABLE alter_repartition_table;
Affected Rows: 0

View File

@@ -10,7 +10,6 @@ CREATE TABLE alter_repartition_table(
device_id >= 200
);
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100
) INTO (
@@ -18,19 +17,17 @@ ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100 AND area >= 'South'
);
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table SPLIT PARTITION (
device_id < 100
) INTO (
SHOW CREATE TABLE alter_repartition_table;
ALTER TABLE alter_repartition_table MERGE PARTITION (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table MERGE PARTITION (
device_id < 100,
device_id >= 100 AND device_id < 200
);
SHOW CREATE TABLE alter_repartition_table;
-- FIXME(weny): Object store is not configured for the test environment,
-- so staging manifest may not be applied in some cases.
-- invalid: empty source clause
ALTER TABLE alter_repartition_table REPARTITION () INTO (

View File

@@ -1,66 +0,0 @@
CREATE TABLE alter_repartition_table(
device_id INT,
area STRING,
ty STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id, area) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
Affected Rows: 0
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100
) INTO (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table SPLIT PARTITION (
device_id < 100
) INTO (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
);
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
-- valid grammar, currently not implemented
ALTER TABLE alter_repartition_table MERGE PARTITION (
device_id < 100,
device_id >= 100 AND device_id < 200
);
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
-- invalid: empty source clause
ALTER TABLE alter_repartition_table REPARTITION () INTO (
device_id < 100
);
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: )
-- invalid: more than one INTO clause
ALTER TABLE alter_repartition_table REPARTITION (
device_id < 100
) INTO (
device_id < 50
), (
device_id >= 50
) INTO (
device_id >= 50
);
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: ,
DROP TABLE alter_repartition_table;
Affected Rows: 0

View File

@@ -24,46 +24,40 @@ VALUES
Affected Rows: 10
-- should not fail with mismatch timezone
-- SQLNESS REPLACE \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z NOW
SELECT count(*) FROM (
SELECT
now()
FROM
ngx_access_log;
ngx_access_log);
+--------------------------------+
| now() |
+--------------------------------+
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
| NOW |
+--------------------------------+
+----------+
| count(*) |
+----------+
| 10 |
+----------+
-- SQLNESS REPLACE TimestampNanosecond\(\d+ TimestampNanosecond(NOW
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT
EXPLAIN SELECT count(*) FROM (
SELECT
now()
FROM
ngx_access_log;
ngx_access_log);
+---------------+-------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------+
| | |
+---------------+-----------------------------------------------------------------------------------+
DROP TABLE ngx_access_log;

View File

@@ -21,18 +21,19 @@ VALUES
("client10", "KR", "2022-01-01 00:00:09");
-- should not fail with mismatch timezone
-- SQLNESS REPLACE \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z NOW
SELECT count(*) FROM (
SELECT
now()
FROM
ngx_access_log;
ngx_access_log);
-- SQLNESS REPLACE TimestampNanosecond\(\d+ TimestampNanosecond(NOW
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
EXPLAIN SELECT
EXPLAIN SELECT count(*) FROM (
SELECT
now()
FROM
ngx_access_log;
ngx_access_log);
DROP TABLE ngx_access_log;