mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-11 15:52:55 +00:00
Compare commits
2 Commits
main
...
canonicali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4796066f3 | ||
|
|
d8d29fd86a |
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -5467,7 +5467,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=58aeee49267fb1eafa6f9123f9d0c47dd0f62722#58aeee49267fb1eafa6f9123f9d0c47dd0f62722"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0e316b86d765e4718d6f0ca77b1ad179f222b822#0e316b86d765e4718d6f0ca77b1ad179f222b822"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
@@ -12512,7 +12512,6 @@ dependencies = [
|
||||
"servers",
|
||||
"snafu 0.8.6",
|
||||
"store-api",
|
||||
"table",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
||||
@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0e316b86d765e4718d6f0ca77b1ad179f222b822" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -64,8 +64,8 @@ use plugins::frontend::context::{
|
||||
use plugins::standalone::context::DdlManagerConfigureContext;
|
||||
use servers::tls::{TlsMode, TlsOption, merge_tls_option};
|
||||
use snafu::ResultExt;
|
||||
use standalone::StandaloneInformationExtension;
|
||||
use standalone::options::StandaloneOptions;
|
||||
use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
|
||||
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
|
||||
@@ -509,13 +509,8 @@ impl StartCommand {
|
||||
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
||||
};
|
||||
|
||||
let ddl_manager = DdlManager::try_new(
|
||||
ddl_context,
|
||||
procedure_manager.clone(),
|
||||
Arc::new(StandaloneRepartitionProcedureFactory),
|
||||
true,
|
||||
)
|
||||
.context(error::InitDdlManagerSnafu)?;
|
||||
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
|
||||
.context(error::InitDdlManagerSnafu)?;
|
||||
|
||||
let ddl_manager = if let Some(configurator) =
|
||||
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
|
||||
|
||||
@@ -34,7 +34,7 @@ use table::requests::{
|
||||
};
|
||||
|
||||
use crate::error::{
|
||||
self, ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
|
||||
ColumnNotFoundSnafu, InvalidColumnDefSnafu, InvalidIndexOptionSnafu,
|
||||
InvalidSetFulltextOptionRequestSnafu, InvalidSetSkippingIndexOptionRequestSnafu,
|
||||
InvalidSetTableOptionRequestSnafu, InvalidUnsetTableOptionRequestSnafu,
|
||||
MissingAlterIndexOptionSnafu, MissingFieldSnafu, MissingTableMetaSnafu,
|
||||
@@ -251,10 +251,6 @@ pub fn alter_expr_to_request(
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
AlterKind::SetDefaults { defaults }
|
||||
}
|
||||
Kind::Repartition(_) => error::UnexpectedSnafu {
|
||||
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
let request = AlterTableRequest {
|
||||
|
||||
@@ -161,13 +161,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unexpected: {err_msg}"))]
|
||||
Unexpected {
|
||||
err_msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -195,7 +188,6 @@ impl ErrorExt for Error {
|
||||
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
|
||||
Error::SqlCommon { source, .. } => source.status_code(),
|
||||
Error::MissingTableMeta { .. } => StatusCode::Unexpected,
|
||||
Error::Unexpected { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ use snafu::OptionExt;
|
||||
use table::metadata::RawTableInfo;
|
||||
|
||||
use crate::ddl::alter_table::AlterTableProcedure;
|
||||
use crate::error::{self, InvalidProtoMsgSnafu, Result};
|
||||
use crate::error::{InvalidProtoMsgSnafu, Result};
|
||||
|
||||
impl AlterTableProcedure {
|
||||
/// Makes alter kind proto that all regions can reuse.
|
||||
@@ -112,10 +112,6 @@ fn create_proto_alter_kind(
|
||||
Kind::UnsetIndexes(v) => Ok(Some(alter_request::Kind::UnsetIndexes(v.clone()))),
|
||||
Kind::DropDefaults(v) => Ok(Some(alter_request::Kind::DropDefaults(v.clone()))),
|
||||
Kind::SetDefaults(v) => Ok(Some(alter_request::Kind::SetDefaults(v.clone()))),
|
||||
Kind::Repartition(_) => error::UnexpectedSnafu {
|
||||
err_msg: "Repartition operation should be handled through DdlManager and not converted to AlterTableRequest",
|
||||
}
|
||||
.fail()?,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,18 +161,7 @@ impl TableMetadataAllocator {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the table id allocator.
|
||||
pub fn table_id_allocator(&self) -> ResourceIdAllocatorRef {
|
||||
self.table_id_allocator.clone()
|
||||
}
|
||||
|
||||
/// Returns the wal options allocator.
|
||||
pub fn wal_options_allocator(&self) -> WalOptionsAllocatorRef {
|
||||
self.wal_options_allocator.clone()
|
||||
}
|
||||
|
||||
/// Returns the region routes allocator.
|
||||
pub fn region_routes_allocator(&self) -> RegionRoutesAllocatorRef {
|
||||
self.region_routes_allocator.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,19 +14,15 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::Repartition;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::{
|
||||
BoxedProcedure, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef,
|
||||
ProcedureWithId, watcher,
|
||||
BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
|
||||
};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{debug, info, tracing};
|
||||
use derive_builder::Builder;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::storage::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::ddl::alter_database::AlterDatabaseProcedure;
|
||||
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
|
||||
@@ -44,8 +40,7 @@ use crate::ddl::drop_view::DropViewProcedure;
|
||||
use crate::ddl::truncate_table::TruncateTableProcedure;
|
||||
use crate::ddl::{DdlContext, utils};
|
||||
use crate::error::{
|
||||
CreateRepartitionProcedureSnafu, EmptyDdlTasksSnafu, ProcedureOutputSnafu,
|
||||
RegisterProcedureLoaderSnafu, RegisterRepartitionProcedureLoaderSnafu, Result,
|
||||
EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
|
||||
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
|
||||
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
|
||||
};
|
||||
@@ -95,7 +90,6 @@ pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoade
|
||||
pub struct DdlManager {
|
||||
ddl_context: DdlContext,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
repartition_procedure_factory: RepartitionProcedureFactoryRef,
|
||||
#[cfg(feature = "enterprise")]
|
||||
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
|
||||
}
|
||||
@@ -149,37 +143,16 @@ macro_rules! procedure_loader {
|
||||
};
|
||||
}
|
||||
|
||||
pub type RepartitionProcedureFactoryRef = Arc<dyn RepartitionProcedureFactory>;
|
||||
|
||||
pub trait RepartitionProcedureFactory: Send + Sync {
|
||||
fn create(
|
||||
&self,
|
||||
ddl_ctx: &DdlContext,
|
||||
table_name: TableName,
|
||||
table_id: TableId,
|
||||
from_exprs: Vec<String>,
|
||||
to_exprs: Vec<String>,
|
||||
) -> std::result::Result<BoxedProcedure, BoxedError>;
|
||||
|
||||
fn register_loaders(
|
||||
&self,
|
||||
ddl_ctx: &DdlContext,
|
||||
procedure_manager: &ProcedureManagerRef,
|
||||
) -> std::result::Result<(), BoxedError>;
|
||||
}
|
||||
|
||||
impl DdlManager {
|
||||
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
|
||||
pub fn try_new(
|
||||
ddl_context: DdlContext,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
repartition_procedure_factory: RepartitionProcedureFactoryRef,
|
||||
register_loaders: bool,
|
||||
) -> Result<Self> {
|
||||
let manager = Self {
|
||||
ddl_context,
|
||||
procedure_manager,
|
||||
repartition_procedure_factory,
|
||||
#[cfg(feature = "enterprise")]
|
||||
trigger_ddl_manager: None,
|
||||
};
|
||||
@@ -231,63 +204,9 @@ impl DdlManager {
|
||||
.context(RegisterProcedureLoaderSnafu { type_name })?;
|
||||
}
|
||||
|
||||
self.repartition_procedure_factory
|
||||
.register_loaders(&self.ddl_context, &self.procedure_manager)
|
||||
.context(RegisterRepartitionProcedureLoaderSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Submits a repartition procedure for the specified table.
|
||||
///
|
||||
/// This creates a repartition procedure using the provided `table_id`,
|
||||
/// `table_name`, and `Repartition` configuration, and then either executes it
|
||||
/// to completion or just submits it for asynchronous execution.
|
||||
///
|
||||
/// The `Repartition` argument contains the original (`from_partition_exprs`)
|
||||
/// and target (`into_partition_exprs`) partition expressions that define how
|
||||
/// the table should be repartitioned.
|
||||
///
|
||||
/// The `wait` flag controls whether this method waits for the repartition
|
||||
/// procedure to finish:
|
||||
/// - If `wait` is `true`, the procedure is executed and this method awaits
|
||||
/// its completion, returning both the generated `ProcedureId` and the
|
||||
/// final `Output` of the procedure.
|
||||
/// - If `wait` is `false`, the procedure is only submitted to the procedure
|
||||
/// manager for asynchronous execution, and this method returns the
|
||||
/// `ProcedureId` along with `None` as the output.
|
||||
async fn submit_repartition_task(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table_name: TableName,
|
||||
Repartition {
|
||||
from_partition_exprs,
|
||||
into_partition_exprs,
|
||||
wait,
|
||||
}: Repartition,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = self
|
||||
.repartition_procedure_factory
|
||||
.create(
|
||||
&context,
|
||||
table_name,
|
||||
table_id,
|
||||
from_partition_exprs,
|
||||
into_partition_exprs,
|
||||
)
|
||||
.context(CreateRepartitionProcedureSnafu)?;
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
if wait {
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
} else {
|
||||
self.submit_procedure(procedure_with_id)
|
||||
.await
|
||||
.map(|p| (p, None))
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits and executes an alter table task.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn submit_alter_table_task(
|
||||
@@ -295,28 +214,13 @@ impl DdlManager {
|
||||
table_id: TableId,
|
||||
alter_table_task: AlterTableTask,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
// make alter_table_task mutable so we can call .take() on its field
|
||||
let mut alter_table_task = alter_table_task;
|
||||
if let Some(Kind::Repartition(_)) = alter_table_task.alter_table.kind.as_ref()
|
||||
&& let Kind::Repartition(repartition) =
|
||||
alter_table_task.alter_table.kind.take().unwrap()
|
||||
{
|
||||
let table_name = TableName::new(
|
||||
alter_table_task.alter_table.catalog_name,
|
||||
alter_table_task.alter_table.schema_name,
|
||||
alter_table_task.alter_table.table_name,
|
||||
);
|
||||
return self
|
||||
.submit_repartition_task(table_id, table_name, repartition)
|
||||
.await;
|
||||
}
|
||||
|
||||
let context = self.create_context();
|
||||
|
||||
let procedure = AlterTableProcedure::new(table_id, alter_table_task, context)?;
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a create table task.
|
||||
@@ -331,7 +235,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a `[CreateViewTask]`.
|
||||
@@ -346,7 +250,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a create multiple logical table tasks.
|
||||
@@ -363,7 +267,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes alter multiple table tasks.
|
||||
@@ -380,7 +284,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a drop table task.
|
||||
@@ -395,7 +299,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a create database task.
|
||||
@@ -414,7 +318,7 @@ impl DdlManager {
|
||||
CreateDatabaseProcedure::new(catalog, schema, create_if_not_exists, options, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a drop table task.
|
||||
@@ -431,7 +335,7 @@ impl DdlManager {
|
||||
let procedure = DropDatabaseProcedure::new(catalog, schema, drop_if_exists, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
pub async fn submit_alter_database(
|
||||
@@ -442,7 +346,7 @@ impl DdlManager {
|
||||
let procedure = AlterDatabaseProcedure::new(alter_database_task, context)?;
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a create flow task.
|
||||
@@ -456,7 +360,7 @@ impl DdlManager {
|
||||
let procedure = CreateFlowProcedure::new(create_flow, query_context, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a drop flow task.
|
||||
@@ -469,7 +373,7 @@ impl DdlManager {
|
||||
let procedure = DropFlowProcedure::new(drop_flow, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a drop view task.
|
||||
@@ -482,7 +386,7 @@ impl DdlManager {
|
||||
let procedure = DropViewProcedure::new(drop_view, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a truncate table task.
|
||||
@@ -503,7 +407,7 @@ impl DdlManager {
|
||||
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Submits and executes a comment on task.
|
||||
@@ -516,11 +420,10 @@ impl DdlManager {
|
||||
let procedure = CommentOnProcedure::new(comment_on_task, context);
|
||||
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
|
||||
|
||||
self.execute_procedure_and_wait(procedure_with_id).await
|
||||
self.submit_procedure(procedure_with_id).await
|
||||
}
|
||||
|
||||
/// Executes a procedure and waits for the result.
|
||||
async fn execute_procedure_and_wait(
|
||||
async fn submit_procedure(
|
||||
&self,
|
||||
procedure_with_id: ProcedureWithId,
|
||||
) -> Result<(ProcedureId, Option<Output>)> {
|
||||
@@ -539,18 +442,6 @@ impl DdlManager {
|
||||
Ok((procedure_id, output))
|
||||
}
|
||||
|
||||
/// Submits a procedure and returns the procedure id.
|
||||
async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
|
||||
let procedure_id = procedure_with_id.id;
|
||||
let _ = self
|
||||
.procedure_manager
|
||||
.submit(procedure_with_id)
|
||||
.await
|
||||
.context(SubmitProcedureSnafu)?;
|
||||
|
||||
Ok(procedure_id)
|
||||
}
|
||||
|
||||
pub async fn submit_ddl_task(
|
||||
&self,
|
||||
ctx: &ExecutorContext,
|
||||
@@ -1056,12 +947,8 @@ async fn handle_comment_on_task(
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::local::LocalManager;
|
||||
use common_procedure::test_util::InMemoryPoisonStore;
|
||||
use common_procedure::{BoxedProcedure, ProcedureManagerRef};
|
||||
use store_api::storage::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use super::DdlManager;
|
||||
use crate::cache_invalidator::DummyCacheInvalidator;
|
||||
@@ -1072,7 +959,6 @@ mod tests {
|
||||
use crate::ddl::table_meta::TableMetadataAllocator;
|
||||
use crate::ddl::truncate_table::TruncateTableProcedure;
|
||||
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
|
||||
use crate::ddl_manager::RepartitionProcedureFactory;
|
||||
use crate::key::TableMetadataManager;
|
||||
use crate::key::flow::FlowMetadataManager;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
@@ -1101,30 +987,6 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyRepartitionProcedureFactory;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RepartitionProcedureFactory for DummyRepartitionProcedureFactory {
|
||||
fn create(
|
||||
&self,
|
||||
_ddl_ctx: &DdlContext,
|
||||
_table_name: TableName,
|
||||
_table_id: TableId,
|
||||
_from_exprs: Vec<String>,
|
||||
_to_exprs: Vec<String>,
|
||||
) -> std::result::Result<BoxedProcedure, BoxedError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn register_loaders(
|
||||
&self,
|
||||
_ddl_ctx: &DdlContext,
|
||||
_procedure_manager: &ProcedureManagerRef,
|
||||
) -> std::result::Result<(), BoxedError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_new() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::new());
|
||||
@@ -1161,7 +1023,6 @@ mod tests {
|
||||
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
||||
},
|
||||
procedure_manager.clone(),
|
||||
Arc::new(DummyRepartitionProcedureFactory),
|
||||
true,
|
||||
);
|
||||
|
||||
|
||||
@@ -104,20 +104,6 @@ pub enum Error {
|
||||
source: common_procedure::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to register repartition procedure loader"))]
|
||||
RegisterRepartitionProcedureLoader {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create repartition procedure"))]
|
||||
CreateRepartitionProcedure {
|
||||
source: BoxedError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to submit procedure"))]
|
||||
SubmitProcedure {
|
||||
#[snafu(implicit)]
|
||||
@@ -1184,8 +1170,6 @@ impl ErrorExt for Error {
|
||||
PutPoison { source, .. } => source.status_code(),
|
||||
ConvertColumnDef { source, .. } => source.status_code(),
|
||||
ProcedureStateReceiver { source, .. } => source.status_code(),
|
||||
RegisterRepartitionProcedureLoader { source, .. } => source.status_code(),
|
||||
CreateRepartitionProcedure { source, .. } => source.status_code(),
|
||||
|
||||
ParseProcedureId { .. }
|
||||
| InvalidNumTopics { .. }
|
||||
|
||||
@@ -37,7 +37,6 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use futures::Stream;
|
||||
use futures::stream::StreamExt;
|
||||
use query::parser::PromQuery;
|
||||
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
|
||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
@@ -74,20 +73,10 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
|
||||
Some(physical_table) => {
|
||||
self.handle_metric_row_inserts(
|
||||
requests,
|
||||
ctx.clone(),
|
||||
physical_table.to_string(),
|
||||
)
|
||||
Request::RowInserts(requests) => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false, false)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
self.handle_row_inserts(requests, ctx.clone(), false, false)
|
||||
.await?
|
||||
}
|
||||
},
|
||||
}
|
||||
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
|
||||
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
|
||||
Request::Query(query_request) => {
|
||||
|
||||
@@ -1038,7 +1038,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize partition expression"))]
|
||||
#[snafu(display("Failed to serialize partition expression: {}", source))]
|
||||
SerializePartitionExpr {
|
||||
#[snafu(source)]
|
||||
source: partition::error::Error,
|
||||
@@ -1046,20 +1046,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize partition expression"))]
|
||||
DeserializePartitionExpr {
|
||||
#[snafu(source)]
|
||||
source: partition::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Empty partition expression"))]
|
||||
EmptyPartitionExpr {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Partition expression mismatch, region id: {}, expected: {}, actual: {}",
|
||||
region_id,
|
||||
@@ -1177,8 +1163,7 @@ impl ErrorExt for Error {
|
||||
| Error::MailboxChannelClosed { .. }
|
||||
| Error::IsNotLeader { .. } => StatusCode::IllegalState,
|
||||
Error::RetryLaterWithSource { source, .. } => source.status_code(),
|
||||
Error::SerializePartitionExpr { source, .. }
|
||||
| Error::DeserializePartitionExpr { source, .. } => source.status_code(),
|
||||
Error::SerializePartitionExpr { source, .. } => source.status_code(),
|
||||
|
||||
Error::Unsupported { .. } => StatusCode::Unsupported,
|
||||
|
||||
@@ -1204,8 +1189,7 @@ impl ErrorExt for Error {
|
||||
| Error::RepartitionSourceRegionMissing { .. }
|
||||
| Error::RepartitionTargetRegionMissing { .. }
|
||||
| Error::PartitionExprMismatch { .. }
|
||||
| Error::RepartitionSourceExprMismatch { .. }
|
||||
| Error::EmptyPartitionExpr { .. } => StatusCode::InvalidArguments,
|
||||
| Error::RepartitionSourceExprMismatch { .. } => StatusCode::InvalidArguments,
|
||||
Error::LeaseKeyFromUtf8 { .. }
|
||||
| Error::LeaseValueFromUtf8 { .. }
|
||||
| Error::InvalidRegionKeyFromUtf8 { .. }
|
||||
|
||||
@@ -70,7 +70,6 @@ use crate::metasrv::{
|
||||
use crate::peer::MetasrvPeerAllocator;
|
||||
use crate::procedure::region_migration::DefaultContextFactory;
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
|
||||
use crate::procedure::wal_prune::Context as WalPruneContext;
|
||||
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
|
||||
use crate::region::flush_trigger::RegionFlushTrigger;
|
||||
@@ -401,17 +400,8 @@ impl MetasrvBuilder {
|
||||
region_failure_detector_controller,
|
||||
};
|
||||
let procedure_manager_c = procedure_manager.clone();
|
||||
let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
|
||||
mailbox.clone(),
|
||||
options.grpc.server_addr.clone(),
|
||||
));
|
||||
let ddl_manager = DdlManager::try_new(
|
||||
ddl_context,
|
||||
procedure_manager_c,
|
||||
repartition_procedure_factory,
|
||||
true,
|
||||
)
|
||||
.context(error::InitDdlManagerSnafu)?;
|
||||
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
|
||||
.context(error::InitDdlManagerSnafu)?;
|
||||
|
||||
let ddl_manager = if let Some(configurator) = plugins
|
||||
.as_ref()
|
||||
|
||||
@@ -27,38 +27,24 @@ use std::fmt::Debug;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl::DdlContext;
|
||||
use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
|
||||
use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
|
||||
use common_meta::ddl_manager::RepartitionProcedureFactory;
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::datanode_table::RegionInfo;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_meta::region_keeper::MemoryRegionKeeperRef;
|
||||
use common_meta::region_registry::LeaderRegionRegistryRef;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
|
||||
};
|
||||
use common_telemetry::error;
|
||||
use partition::expr::PartitionExpr;
|
||||
use common_procedure::{Context as ProcedureContext, Status};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::procedure::repartition::group::{
|
||||
Context as RepartitionGroupContext, RepartitionGroupProcedure,
|
||||
};
|
||||
use crate::procedure::repartition::plan::RepartitionPlanEntry;
|
||||
use crate::procedure::repartition::repartition_start::RepartitionStart;
|
||||
use crate::procedure::repartition::utils::get_datanode_table_value;
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
@@ -74,35 +60,6 @@ pub struct PersistentContext {
|
||||
pub plans: Vec<RepartitionPlanEntry>,
|
||||
}
|
||||
|
||||
impl PersistentContext {
|
||||
pub fn new(
|
||||
TableName {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
}: TableName,
|
||||
table_id: TableId,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_name,
|
||||
schema_name,
|
||||
table_name,
|
||||
table_id,
|
||||
plans: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lock_key(&self) -> Vec<StringKey> {
|
||||
vec![
|
||||
CatalogLock::Read(&self.catalog_name).into(),
|
||||
SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
|
||||
TableLock::Write(self.table_id).into(),
|
||||
TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Context {
|
||||
pub persistent_ctx: PersistentContext,
|
||||
pub table_metadata_manager: TableMetadataManagerRef,
|
||||
@@ -117,26 +74,6 @@ pub struct Context {
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub fn new(
|
||||
ddl_ctx: &DdlContext,
|
||||
mailbox: MailboxRef,
|
||||
server_addr: String,
|
||||
persistent_ctx: PersistentContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistent_ctx,
|
||||
table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
|
||||
memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
|
||||
node_manager: ddl_ctx.node_manager.clone(),
|
||||
leader_region_registry: ddl_ctx.leader_region_registry.clone(),
|
||||
mailbox,
|
||||
server_addr,
|
||||
cache_invalidator: ddl_ctx.cache_invalidator.clone(),
|
||||
region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
|
||||
wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves the table route value for the given table id.
|
||||
///
|
||||
/// Retry:
|
||||
@@ -270,202 +207,3 @@ pub(crate) trait State: Sync + Send + Debug {
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
pub struct RepartitionProcedure {
|
||||
state: Box<dyn State>,
|
||||
context: Context,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RepartitionData<'a> {
|
||||
state: &'a dyn State,
|
||||
persistent_ctx: &'a PersistentContext,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RepartitionDataOwned {
|
||||
state: Box<dyn State>,
|
||||
persistent_ctx: PersistentContext,
|
||||
}
|
||||
|
||||
impl RepartitionProcedure {
|
||||
const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
|
||||
|
||||
pub fn new(
|
||||
from_exprs: Vec<PartitionExpr>,
|
||||
to_exprs: Vec<PartitionExpr>,
|
||||
context: Context,
|
||||
) -> Self {
|
||||
let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
|
||||
|
||||
Self { state, context }
|
||||
}
|
||||
|
||||
pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
|
||||
where
|
||||
F: FnOnce(PersistentContext) -> Context,
|
||||
{
|
||||
let RepartitionDataOwned {
|
||||
state,
|
||||
persistent_ctx,
|
||||
} = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let context = ctx_factory(persistent_ctx);
|
||||
|
||||
Ok(Self { state, context })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Procedure for RepartitionProcedure {
|
||||
fn type_name(&self) -> &str {
|
||||
Self::TYPE_NAME
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let state = &mut self.state;
|
||||
match state.next(&mut self.context, _ctx).await {
|
||||
Ok((next, status)) => {
|
||||
*state = next;
|
||||
Ok(status)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.is_retryable() {
|
||||
Err(ProcedureError::retry_later(e))
|
||||
} else {
|
||||
error!(
|
||||
e;
|
||||
"Repartition procedure failed, table id: {}",
|
||||
self.context.persistent_ctx.table_id,
|
||||
);
|
||||
Err(ProcedureError::external(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn rollback_supported(&self) -> bool {
|
||||
// TODO(weny): support rollback.
|
||||
false
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
let data = RepartitionData {
|
||||
state: self.state.as_ref(),
|
||||
persistent_ctx: &self.context.persistent_ctx,
|
||||
};
|
||||
serde_json::to_string(&data).context(ToJsonSnafu)
|
||||
}
|
||||
|
||||
fn lock_key(&self) -> LockKey {
|
||||
LockKey::new(self.context.persistent_ctx.lock_key())
|
||||
}
|
||||
|
||||
fn user_metadata(&self) -> Option<UserMetadata> {
|
||||
// TODO(weny): support user metadata.
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DefaultRepartitionProcedureFactory {
|
||||
mailbox: MailboxRef,
|
||||
server_addr: String,
|
||||
}
|
||||
|
||||
impl DefaultRepartitionProcedureFactory {
|
||||
pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
|
||||
Self {
|
||||
mailbox,
|
||||
server_addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
|
||||
fn create(
|
||||
&self,
|
||||
ddl_ctx: &DdlContext,
|
||||
table_name: TableName,
|
||||
table_id: TableId,
|
||||
from_exprs: Vec<String>,
|
||||
to_exprs: Vec<String>,
|
||||
) -> std::result::Result<BoxedProcedure, BoxedError> {
|
||||
let persistent_ctx = PersistentContext::new(table_name, table_id);
|
||||
let from_exprs = from_exprs
|
||||
.iter()
|
||||
.map(|e| {
|
||||
PartitionExpr::from_json_str(e)
|
||||
.context(error::DeserializePartitionExprSnafu)?
|
||||
.context(error::EmptyPartitionExprSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map_err(BoxedError::new)?;
|
||||
let to_exprs = to_exprs
|
||||
.iter()
|
||||
.map(|e| {
|
||||
PartitionExpr::from_json_str(e)
|
||||
.context(error::DeserializePartitionExprSnafu)?
|
||||
.context(error::EmptyPartitionExprSnafu)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
let procedure = RepartitionProcedure::new(
|
||||
from_exprs,
|
||||
to_exprs,
|
||||
Context::new(
|
||||
ddl_ctx,
|
||||
self.mailbox.clone(),
|
||||
self.server_addr.clone(),
|
||||
persistent_ctx,
|
||||
),
|
||||
);
|
||||
|
||||
Ok(Box::new(procedure))
|
||||
}
|
||||
|
||||
fn register_loaders(
|
||||
&self,
|
||||
ddl_ctx: &DdlContext,
|
||||
procedure_manager: &ProcedureManagerRef,
|
||||
) -> std::result::Result<(), BoxedError> {
|
||||
// Registers the repartition procedure loader.
|
||||
let mailbox = self.mailbox.clone();
|
||||
let server_addr = self.server_addr.clone();
|
||||
let moved_ddl_ctx = ddl_ctx.clone();
|
||||
procedure_manager
|
||||
.register_loader(
|
||||
RepartitionProcedure::TYPE_NAME,
|
||||
Box::new(move |json| {
|
||||
let mailbox = mailbox.clone();
|
||||
let server_addr = server_addr.clone();
|
||||
let ddl_ctx = moved_ddl_ctx.clone();
|
||||
let factory = move |persistent_ctx| {
|
||||
Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
|
||||
};
|
||||
RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
// Registers the repartition group procedure loader.
|
||||
let mailbox = self.mailbox.clone();
|
||||
let server_addr = self.server_addr.clone();
|
||||
let moved_ddl_ctx = ddl_ctx.clone();
|
||||
procedure_manager
|
||||
.register_loader(
|
||||
RepartitionGroupProcedure::TYPE_NAME,
|
||||
Box::new(move |json| {
|
||||
let mailbox = mailbox.clone();
|
||||
let server_addr = server_addr.clone();
|
||||
let ddl_ctx = moved_ddl_ctx.clone();
|
||||
let factory = move |persistent_ctx| {
|
||||
RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
|
||||
};
|
||||
RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
|
||||
}),
|
||||
)
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ use std::time::Duration;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl::DdlContext;
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
@@ -35,7 +34,7 @@ use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::error::ToJsonSnafu;
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
|
||||
Result as ProcedureResult, Status, StringKey, UserMetadata,
|
||||
@@ -61,20 +60,8 @@ pub struct RepartitionGroupProcedure {
|
||||
context: Context,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RepartitionGroupData<'a> {
|
||||
persistent_ctx: &'a PersistentContext,
|
||||
state: &'a dyn State,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RepartitionGroupDataOwned {
|
||||
persistent_ctx: PersistentContext,
|
||||
state: Box<dyn State>,
|
||||
}
|
||||
|
||||
impl RepartitionGroupProcedure {
|
||||
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
|
||||
const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
|
||||
|
||||
pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
|
||||
let state = Box::new(RepartitionStart);
|
||||
@@ -90,19 +77,12 @@ impl RepartitionGroupProcedure {
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
|
||||
where
|
||||
F: FnOnce(PersistentContext) -> Context,
|
||||
{
|
||||
let RepartitionGroupDataOwned {
|
||||
state,
|
||||
persistent_ctx,
|
||||
} = serde_json::from_str(json).context(FromJsonSnafu)?;
|
||||
let context = ctx_factory(persistent_ctx);
|
||||
|
||||
Ok(Self { state, context })
|
||||
}
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct RepartitionGroupData<'a> {
|
||||
persistent_ctx: &'a PersistentContext,
|
||||
state: &'a dyn State,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -169,23 +149,6 @@ pub struct Context {
|
||||
pub server_addr: String,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
pub fn new(
|
||||
ddl_ctx: &DdlContext,
|
||||
mailbox: MailboxRef,
|
||||
server_addr: String,
|
||||
persistent_ctx: PersistentContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
persistent_ctx,
|
||||
cache_invalidator: ddl_ctx.cache_invalidator.clone(),
|
||||
table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
|
||||
mailbox,
|
||||
server_addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of the group preparation phase, containing validated region routes.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct GroupPrepareResult {
|
||||
|
||||
@@ -31,7 +31,7 @@ use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::repartition::group::repartition_end::RepartitionEnd;
|
||||
use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
|
||||
use crate::procedure::repartition::group::utils::{
|
||||
HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
|
||||
};
|
||||
@@ -52,7 +52,10 @@ impl State for ApplyStagingManifest {
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
self.apply_staging_manifests(ctx).await?;
|
||||
|
||||
Ok((Box::new(RepartitionEnd), Status::executing(true)))
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -60,18 +63,13 @@ impl State for ApplyStagingManifest {
|
||||
}
|
||||
}
|
||||
|
||||
struct ApplyStagingManifestInstructions {
|
||||
instructions: HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>,
|
||||
central_region_instruction: Option<(Peer, common_meta::instruction::ApplyStagingManifest)>,
|
||||
}
|
||||
|
||||
impl ApplyStagingManifest {
|
||||
fn build_apply_staging_manifest_instructions(
|
||||
staging_manifest_paths: &HashMap<RegionId, String>,
|
||||
target_routes: &[RegionRoute],
|
||||
targets: &[RegionDescriptor],
|
||||
central_region_id: RegionId,
|
||||
) -> Result<ApplyStagingManifestInstructions> {
|
||||
) -> Result<HashMap<Peer, Vec<common_meta::instruction::ApplyStagingManifest>>> {
|
||||
let target_partition_expr_by_region = targets
|
||||
.iter()
|
||||
.map(|target| {
|
||||
@@ -88,25 +86,7 @@ impl ApplyStagingManifest {
|
||||
let target_region_routes_by_peer = group_region_routes_by_peer(target_routes);
|
||||
let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
|
||||
|
||||
let mut central_region_instruction = None;
|
||||
for (peer, mut region_ids) in target_region_routes_by_peer {
|
||||
// If the central region is in the target region ids,
|
||||
// remove it and build the instruction for the central region.
|
||||
if region_ids.contains(¢ral_region_id) {
|
||||
region_ids.retain(|r| *r != central_region_id);
|
||||
central_region_instruction = Some((
|
||||
peer.clone(),
|
||||
common_meta::instruction::ApplyStagingManifest {
|
||||
region_id: central_region_id,
|
||||
partition_expr: target_partition_expr_by_region[¢ral_region_id].clone(),
|
||||
central_region_id,
|
||||
manifest_path: staging_manifest_paths[¢ral_region_id].clone(),
|
||||
},
|
||||
));
|
||||
if region_ids.is_empty() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
for (peer, region_ids) in target_region_routes_by_peer {
|
||||
let apply_staging_manifests = region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| common_meta::instruction::ApplyStagingManifest {
|
||||
@@ -119,10 +99,7 @@ impl ApplyStagingManifest {
|
||||
instructions.insert(peer.clone(), apply_staging_manifests);
|
||||
}
|
||||
|
||||
Ok(ApplyStagingManifestInstructions {
|
||||
instructions,
|
||||
central_region_instruction,
|
||||
})
|
||||
Ok(instructions)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
@@ -135,10 +112,7 @@ impl ApplyStagingManifest {
|
||||
let targets = &ctx.persistent_ctx.targets;
|
||||
let target_routes = &prepare_result.target_routes;
|
||||
let central_region_id = prepare_result.central_region;
|
||||
let ApplyStagingManifestInstructions {
|
||||
instructions,
|
||||
central_region_instruction,
|
||||
} = Self::build_apply_staging_manifest_instructions(
|
||||
let instructions = Self::build_apply_staging_manifest_instructions(
|
||||
staging_manifest_paths,
|
||||
target_routes,
|
||||
targets,
|
||||
@@ -181,10 +155,8 @@ impl ApplyStagingManifest {
|
||||
let results = join_all(tasks).await;
|
||||
let result = handle_multiple_results(&results);
|
||||
match result {
|
||||
HandleMultipleResult::AllSuccessful => {
|
||||
// Coninute
|
||||
}
|
||||
HandleMultipleResult::AllRetryable(retryable_errors) => return error::RetryLaterSnafu {
|
||||
HandleMultipleResult::AllSuccessful => Ok(()),
|
||||
HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
|
||||
reason: format!(
|
||||
"All retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
|
||||
table_id, group_id,
|
||||
@@ -196,7 +168,7 @@ impl ApplyStagingManifest {
|
||||
),
|
||||
}
|
||||
.fail(),
|
||||
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => return error::UnexpectedSnafu {
|
||||
HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"All non retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}",
|
||||
table_id, group_id,
|
||||
@@ -211,7 +183,7 @@ impl ApplyStagingManifest {
|
||||
HandleMultipleResult::PartialRetryable {
|
||||
retryable_errors,
|
||||
non_retryable_errors,
|
||||
} => return error::UnexpectedSnafu {
|
||||
} => error::UnexpectedSnafu {
|
||||
violated: format!(
|
||||
"Partial retryable errors during applying staging manifests for repartition table {}, group id {}: {:?}, non retryable errors: {:?}",
|
||||
table_id, group_id,
|
||||
@@ -229,20 +201,6 @@ impl ApplyStagingManifest {
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
|
||||
if let Some((peer, instruction)) = central_region_instruction {
|
||||
info!("Applying staging manifest for central region: {:?}", peer);
|
||||
Self::apply_staging_manifest(
|
||||
&ctx.mailbox,
|
||||
&ctx.server_addr,
|
||||
&peer,
|
||||
&[instruction],
|
||||
operation_timeout,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn apply_staging_manifest(
|
||||
|
||||
@@ -128,6 +128,14 @@ impl RepartitionStart {
|
||||
central_region_datanode,
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn next_state() -> (Box<dyn State>, Status) {
|
||||
(
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -149,10 +157,7 @@ impl State for RepartitionStart {
|
||||
_procedure_ctx: &ProcedureContext,
|
||||
) -> Result<(Box<dyn State>, Status)> {
|
||||
if ctx.persistent_ctx.group_prepare_result.is_some() {
|
||||
return Ok((
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
));
|
||||
return Ok(Self::next_state());
|
||||
}
|
||||
let table_id = ctx.persistent_ctx.table_id;
|
||||
let group_id = ctx.persistent_ctx.group_id;
|
||||
@@ -172,10 +177,7 @@ impl State for RepartitionStart {
|
||||
ctx.persistent_ctx.targets.len()
|
||||
);
|
||||
|
||||
Ok((
|
||||
Box::new(UpdateMetadata::ApplyStaging),
|
||||
Status::executing(true),
|
||||
))
|
||||
Ok(Self::next_state())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
|
||||
@@ -736,14 +736,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize partition expression"))]
|
||||
SerializePartitionExpr {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: partition::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize partition expression"))]
|
||||
#[snafu(display("Failed to deserialize partition expression: {}", source))]
|
||||
DeserializePartitionExpr {
|
||||
#[snafu(source)]
|
||||
source: partition::error::Error,
|
||||
@@ -990,8 +983,7 @@ impl ErrorExt for Error {
|
||||
| Error::MissingInsertBody { .. } => StatusCode::Internal,
|
||||
Error::ExecuteAdminFunction { .. }
|
||||
| Error::EncodeJson { .. }
|
||||
| Error::DeserializePartitionExpr { .. }
|
||||
| Error::SerializePartitionExpr { .. } => StatusCode::Unexpected,
|
||||
| Error::DeserializePartitionExpr { .. } => StatusCode::Unexpected,
|
||||
Error::ViewNotFound { .. }
|
||||
| Error::ViewInfoNotFound { .. }
|
||||
| Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
|
||||
@@ -16,11 +16,9 @@ use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::meta::CreateFlowTask as PbCreateFlowTask;
|
||||
use api::v1::{
|
||||
AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
|
||||
Repartition, column_def,
|
||||
AlterDatabaseExpr, AlterTableExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, column_def,
|
||||
};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use api::v1::{
|
||||
@@ -90,9 +88,8 @@ use crate::error::{
|
||||
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, NotSupportedSnafu,
|
||||
PartitionExprToPbSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
|
||||
SerializePartitionExprSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
|
||||
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
||||
ViewAlreadyExistsSnafu,
|
||||
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
};
|
||||
use crate::expr_helper::{self, RepartitionRequest};
|
||||
use crate::statement::StatementExecutor;
|
||||
@@ -1411,56 +1408,20 @@ impl StatementExecutor {
|
||||
.context(InvalidPartitionSnafu)?;
|
||||
|
||||
info!(
|
||||
"Submitting repartition task for table {} (table_id={}), from {} to {} partitions",
|
||||
"Repartition table {} (table_id={}) from {:?} to {:?}, new partition count: {}",
|
||||
table_ref,
|
||||
table_id,
|
||||
from_partition_exprs.len(),
|
||||
from_partition_exprs,
|
||||
into_partition_exprs,
|
||||
new_partition_exprs_len
|
||||
);
|
||||
|
||||
let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
|
||||
let mut json_exprs = Vec::with_capacity(exprs.len());
|
||||
for expr in exprs {
|
||||
json_exprs.push(expr.as_json_str().context(SerializePartitionExprSnafu)?);
|
||||
}
|
||||
Ok(json_exprs)
|
||||
};
|
||||
let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
|
||||
let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
|
||||
let req = SubmitDdlTaskRequest {
|
||||
query_context: query_context.clone(),
|
||||
task: DdlTask::new_alter_table(AlterTableExpr {
|
||||
catalog_name: request.catalog_name.clone(),
|
||||
schema_name: request.schema_name.clone(),
|
||||
table_name: request.table_name.clone(),
|
||||
kind: Some(Kind::Repartition(Repartition {
|
||||
from_partition_exprs: from_partition_exprs_json,
|
||||
into_partition_exprs: into_partition_exprs_json,
|
||||
// TODO(weny): allow passing 'wait' from SQL options or QueryContext
|
||||
wait: true,
|
||||
})),
|
||||
}),
|
||||
};
|
||||
let invalidate_keys = vec![
|
||||
CacheIdent::TableId(table_id),
|
||||
CacheIdent::TableName(TableName::new(
|
||||
request.catalog_name,
|
||||
request.schema_name,
|
||||
request.table_name,
|
||||
)),
|
||||
];
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), req)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)?;
|
||||
|
||||
// Invalidates local cache ASAP.
|
||||
self.cache_invalidator
|
||||
.invalidate(&Context::default(), &invalidate_keys)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
// TODO(weny): Implement the repartition procedure submission.
|
||||
// The repartition procedure infrastructure is not yet fully integrated with the DDL task system.
|
||||
NotSupportedSnafu {
|
||||
feat: "ALTER TABLE REPARTITION",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -2208,6 +2169,7 @@ mod test {
|
||||
use sql::dialect::GreptimeDbDialect;
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::parser::Parser;
|
||||
|
||||
use super::*;
|
||||
use crate::expr_helper;
|
||||
@@ -2225,6 +2187,39 @@ mod test {
|
||||
assert!(!NAME_PATTERN_REG.is_match("#"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_partition_expr_equivalence_with_swapped_operands() {
|
||||
let column_name = "device_id".to_string();
|
||||
let column_name_and_type =
|
||||
HashMap::from([(&column_name, ConcreteDataType::int32_datatype())]);
|
||||
let timezone = Timezone::from_tz_string("UTC").unwrap();
|
||||
let dialect = GreptimeDbDialect {};
|
||||
|
||||
let mut parser = Parser::new(&dialect)
|
||||
.try_with_sql("device_id < 100")
|
||||
.unwrap();
|
||||
let expr_left = parser.parse_expr().unwrap();
|
||||
|
||||
let mut parser = Parser::new(&dialect)
|
||||
.try_with_sql("100 > device_id")
|
||||
.unwrap();
|
||||
let expr_right = parser.parse_expr().unwrap();
|
||||
|
||||
let partition_left =
|
||||
convert_one_expr(&expr_left, &column_name_and_type, &timezone).unwrap();
|
||||
let partition_right =
|
||||
convert_one_expr(&expr_right, &column_name_and_type, &timezone).unwrap();
|
||||
|
||||
assert_eq!(partition_left, partition_right);
|
||||
assert!([partition_left.clone()].contains(&partition_right));
|
||||
|
||||
let mut physical_partition_exprs = vec![partition_left];
|
||||
let mut logical_partition_exprs = vec![partition_right];
|
||||
physical_partition_exprs.sort_unstable();
|
||||
logical_partition_exprs.sort_unstable();
|
||||
assert_eq!(physical_partition_exprs, logical_partition_exprs);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "TODO(ruihang): WIP new partition rule"]
|
||||
async fn test_parse_partitions() {
|
||||
|
||||
@@ -185,6 +185,19 @@ impl RestrictedOp {
|
||||
Self::Or => ParserBinaryOperator::Or,
|
||||
}
|
||||
}
|
||||
|
||||
fn invert_for_swap(&self) -> Self {
|
||||
match self {
|
||||
Self::Eq => Self::Eq,
|
||||
Self::NotEq => Self::NotEq,
|
||||
Self::Lt => Self::Gt,
|
||||
Self::LtEq => Self::GtEq,
|
||||
Self::Gt => Self::Lt,
|
||||
Self::GtEq => Self::LtEq,
|
||||
Self::And => Self::And,
|
||||
Self::Or => Self::Or,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Display for RestrictedOp {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -208,6 +221,32 @@ impl PartitionExpr {
|
||||
op,
|
||||
rhs: Box::new(rhs),
|
||||
}
|
||||
.canonicalize()
|
||||
}
|
||||
|
||||
/// Canonicalize to `Column op Value` form when possible for consistent equality checks.
|
||||
pub fn canonicalize(self) -> Self {
|
||||
let lhs = Self::canonicalize_operand(*self.lhs);
|
||||
let rhs = Self::canonicalize_operand(*self.rhs);
|
||||
let mut expr = Self {
|
||||
lhs: Box::new(lhs),
|
||||
op: self.op,
|
||||
rhs: Box::new(rhs),
|
||||
};
|
||||
|
||||
if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
|
||||
std::mem::swap(&mut expr.lhs, &mut expr.rhs);
|
||||
expr.op = expr.op.invert_for_swap();
|
||||
}
|
||||
|
||||
expr
|
||||
}
|
||||
|
||||
fn canonicalize_operand(operand: Operand) -> Operand {
|
||||
match operand {
|
||||
Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert [Self] back to sqlparser's [Expr]
|
||||
@@ -354,7 +393,7 @@ impl PartitionExpr {
|
||||
|
||||
let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
|
||||
match bound {
|
||||
PartitionBound::Expr(expr) => Ok(Some(expr)),
|
||||
PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
@@ -494,7 +533,7 @@ mod tests {
|
||||
.try_as_logical_expr()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
"Int64(10) < a OR a IS NULL"
|
||||
"a > Int64(10) OR a IS NULL"
|
||||
);
|
||||
|
||||
// Test Gt with column on LHS
|
||||
@@ -519,7 +558,7 @@ mod tests {
|
||||
.try_as_logical_expr()
|
||||
.unwrap()
|
||||
.to_string(),
|
||||
"Int64(10) > a OR a IS NULL"
|
||||
"a < Int64(10) OR a IS NULL"
|
||||
);
|
||||
|
||||
// Test GtEq with column on LHS
|
||||
|
||||
@@ -37,5 +37,4 @@ serde.workspace = true
|
||||
servers.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -36,12 +36,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Repartition procedure is not supported in standalone mode"))]
|
||||
NoSupportRepartitionProcedure {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -51,7 +45,6 @@ impl ErrorExt for Error {
|
||||
match self {
|
||||
Error::OpenMetadataKvBackend { source, .. } => source.status_code(),
|
||||
Error::External { source, .. } => source.status_code(),
|
||||
Error::NoSupportRepartitionProcedure { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,4 +20,4 @@ pub mod procedure;
|
||||
|
||||
pub use information_extension::StandaloneInformationExtension;
|
||||
pub use metadata::build_metadata_kvbackend;
|
||||
pub use procedure::{StandaloneRepartitionProcedureFactory, build_procedure_manager};
|
||||
pub use procedure::build_procedure_manager;
|
||||
|
||||
@@ -14,19 +14,12 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::DdlContext;
|
||||
use common_meta::ddl_manager::RepartitionProcedureFactory;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManager;
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::state_store::KvStateStore;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::{BoxedProcedure, ProcedureManagerRef};
|
||||
use store_api::storage::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::error::NoSupportRepartitionProcedureSnafu;
|
||||
|
||||
/// Builds the procedure manager.
|
||||
pub fn build_procedure_manager(
|
||||
@@ -50,32 +43,3 @@ pub fn build_procedure_manager(
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
/// No-op implementation of [`RepartitionProcedureFactory`] for standalone mode.
|
||||
///
|
||||
/// In standalone deployments, repartition operations are not supported, so
|
||||
/// this factory always returns a `NoSupportRepartitionProcedure` error
|
||||
/// from [`RepartitionProcedureFactory::create`] and performs no registration
|
||||
/// work in [`RepartitionProcedureFactory::register_loaders`].
|
||||
pub struct StandaloneRepartitionProcedureFactory;
|
||||
|
||||
impl RepartitionProcedureFactory for StandaloneRepartitionProcedureFactory {
|
||||
fn create(
|
||||
&self,
|
||||
_ddl_ctx: &DdlContext,
|
||||
_table_name: TableName,
|
||||
_table_id: TableId,
|
||||
_from_exprs: Vec<String>,
|
||||
_to_exprs: Vec<String>,
|
||||
) -> std::result::Result<BoxedProcedure, BoxedError> {
|
||||
Err(BoxedError::new(NoSupportRepartitionProcedureSnafu.build()))
|
||||
}
|
||||
|
||||
fn register_loaders(
|
||||
&self,
|
||||
_ddl_ctx: &DdlContext,
|
||||
_procedure_manager: &ProcedureManagerRef,
|
||||
) -> std::result::Result<(), BoxedError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,6 @@ use frontend::server::Services;
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use snafu::ResultExt;
|
||||
use standalone::StandaloneRepartitionProcedureFactory;
|
||||
use standalone::options::StandaloneOptions;
|
||||
|
||||
use crate::test_util::{self, StorageType, TestGuard, create_tmp_dir_and_datanode_opts};
|
||||
@@ -216,7 +215,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
flow_id_sequence,
|
||||
));
|
||||
|
||||
let repartition_procedure_factory = Arc::new(StandaloneRepartitionProcedureFactory);
|
||||
let ddl_manager = Arc::new(
|
||||
DdlManager::try_new(
|
||||
DdlContext {
|
||||
@@ -231,7 +229,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
|
||||
},
|
||||
procedure_manager.clone(),
|
||||
repartition_procedure_factory,
|
||||
register_procedure_loaders,
|
||||
)
|
||||
.unwrap(),
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
CREATE TABLE alter_repartition_table(
|
||||
device_id INT,
|
||||
area STRING,
|
||||
ty STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY(device_id)
|
||||
) PARTITION ON COLUMNS (device_id, area) (
|
||||
device_id < 100,
|
||||
device_id >= 100 AND device_id < 200,
|
||||
device_id >= 200
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 100 AND area < 'South',
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE alter_repartition_table;
|
||||
|
||||
+-------------------------+--------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------------------------+--------------------------------------------------------+
|
||||
| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( |
|
||||
| | "device_id" INT NULL, |
|
||||
| | "area" STRING NULL, |
|
||||
| | "ty" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("device_id") |
|
||||
| | ) |
|
||||
| | PARTITION ON COLUMNS ("device_id", "area") ( |
|
||||
| | device_id < 100 AND area < 'South', |
|
||||
| | device_id >= 100 AND device_id < 200, |
|
||||
| | device_id >= 200, |
|
||||
| | device_id < 100 AND area >= 'South' |
|
||||
| | ) |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+-------------------------+--------------------------------------------------------+
|
||||
|
||||
ALTER TABLE alter_repartition_table MERGE PARTITION (
|
||||
device_id < 100 AND area < 'South',
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SHOW CREATE TABLE alter_repartition_table;
|
||||
|
||||
+-------------------------+------------------------------------------------------------------------------+
|
||||
| Table | Create Table |
|
||||
+-------------------------+------------------------------------------------------------------------------+
|
||||
| alter_repartition_table | CREATE TABLE IF NOT EXISTS "alter_repartition_table" ( |
|
||||
| | "device_id" INT NULL, |
|
||||
| | "area" STRING NULL, |
|
||||
| | "ty" STRING NULL, |
|
||||
| | "ts" TIMESTAMP(3) NOT NULL, |
|
||||
| | TIME INDEX ("ts"), |
|
||||
| | PRIMARY KEY ("device_id") |
|
||||
| | ) |
|
||||
| | PARTITION ON COLUMNS ("device_id", "area") ( |
|
||||
| | device_id < 100 AND area < 'South' OR device_id < 100 AND area >= 'South', |
|
||||
| | device_id >= 100 AND device_id < 200, |
|
||||
| | device_id >= 200 |
|
||||
| | ) |
|
||||
| | ENGINE=mito |
|
||||
| | |
|
||||
+-------------------------+------------------------------------------------------------------------------+
|
||||
|
||||
-- FIXME(weny): Object store is not configured for the test environment,
|
||||
-- so staging manifest may not be applied in some cases.
|
||||
-- invalid: empty source clause
|
||||
ALTER TABLE alter_repartition_table REPARTITION () INTO (
|
||||
device_id < 100
|
||||
);
|
||||
|
||||
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: )
|
||||
|
||||
-- invalid: more than one INTO clause
|
||||
ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 50
|
||||
), (
|
||||
device_id >= 50
|
||||
) INTO (
|
||||
device_id >= 50
|
||||
);
|
||||
|
||||
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: ,
|
||||
|
||||
DROP TABLE alter_repartition_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
66
tests/cases/standalone/common/alter/repartition.result
Normal file
66
tests/cases/standalone/common/alter/repartition.result
Normal file
@@ -0,0 +1,66 @@
|
||||
CREATE TABLE alter_repartition_table(
|
||||
device_id INT,
|
||||
area STRING,
|
||||
ty STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
PRIMARY KEY(device_id)
|
||||
) PARTITION ON COLUMNS (device_id, area) (
|
||||
device_id < 100,
|
||||
device_id >= 100 AND device_id < 200,
|
||||
device_id >= 200
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 100 AND area < 'South',
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
|
||||
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table SPLIT PARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 100 AND area < 'South',
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
|
||||
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table MERGE PARTITION (
|
||||
device_id < 100,
|
||||
device_id >= 100 AND device_id < 200
|
||||
);
|
||||
|
||||
Error: 1001(Unsupported), Not supported: ALTER TABLE REPARTITION
|
||||
|
||||
-- invalid: empty source clause
|
||||
ALTER TABLE alter_repartition_table REPARTITION () INTO (
|
||||
device_id < 100
|
||||
);
|
||||
|
||||
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected expression inside REPARTITION clause, found: )
|
||||
|
||||
-- invalid: more than one INTO clause
|
||||
ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 50
|
||||
), (
|
||||
device_id >= 50
|
||||
) INTO (
|
||||
device_id >= 50
|
||||
);
|
||||
|
||||
Error: 2000(InvalidSyntax), Invalid SQL syntax: sql parser error: Expected end of REPARTITION clause, found: ,
|
||||
|
||||
DROP TABLE alter_repartition_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -10,6 +10,7 @@ CREATE TABLE alter_repartition_table(
|
||||
device_id >= 200
|
||||
);
|
||||
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
@@ -17,17 +18,19 @@ ALTER TABLE alter_repartition_table REPARTITION (
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
SHOW CREATE TABLE alter_repartition_table;
|
||||
|
||||
ALTER TABLE alter_repartition_table MERGE PARTITION (
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table SPLIT PARTITION (
|
||||
device_id < 100
|
||||
) INTO (
|
||||
device_id < 100 AND area < 'South',
|
||||
device_id < 100 AND area >= 'South'
|
||||
);
|
||||
|
||||
SHOW CREATE TABLE alter_repartition_table;
|
||||
|
||||
-- FIXME(weny): Object store is not configured for the test environment,
|
||||
-- so staging manifest may not be applied in some cases.
|
||||
-- valid grammar, currently not implemented
|
||||
ALTER TABLE alter_repartition_table MERGE PARTITION (
|
||||
device_id < 100,
|
||||
device_id >= 100 AND device_id < 200
|
||||
);
|
||||
|
||||
-- invalid: empty source clause
|
||||
ALTER TABLE alter_repartition_table REPARTITION () INTO (
|
||||
@@ -24,40 +24,46 @@ VALUES
|
||||
Affected Rows: 10
|
||||
|
||||
-- should not fail with mismatch timezone
|
||||
SELECT count(*) FROM (
|
||||
-- SQLNESS REPLACE \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z NOW
|
||||
SELECT
|
||||
now()
|
||||
FROM
|
||||
ngx_access_log);
|
||||
ngx_access_log;
|
||||
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 10 |
|
||||
+----------+
|
||||
+--------------------------------+
|
||||
| now() |
|
||||
+--------------------------------+
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
| NOW |
|
||||
+--------------------------------+
|
||||
|
||||
-- SQLNESS REPLACE TimestampNanosecond\(\d+ TimestampNanosecond(NOW
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (partitioning.*) REDACTED
|
||||
EXPLAIN SELECT count(*) FROM (
|
||||
SELECT
|
||||
EXPLAIN SELECT
|
||||
now()
|
||||
FROM
|
||||
ngx_access_log);
|
||||
ngx_access_log;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------+
|
||||
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
|
||||
| | Projection: count(Int64(1)) AS count(*) |
|
||||
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
|
||||
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
|
||||
| | TableScan: ngx_access_log |
|
||||
| | ]] |
|
||||
| physical_plan | CooperativeExec |
|
||||
+---------------+-------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-------------------------------------------------------------------------------+
|
||||
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
|
||||
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
|
||||
| | TableScan: ngx_access_log |
|
||||
| | ]] |
|
||||
| physical_plan | CooperativeExec |
|
||||
| | MergeScanExec: REDACTED
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------+
|
||||
| | |
|
||||
+---------------+-------------------------------------------------------------------------------+
|
||||
|
||||
DROP TABLE ngx_access_log;
|
||||
|
||||
|
||||
@@ -21,19 +21,18 @@ VALUES
|
||||
("client10", "KR", "2022-01-01 00:00:09");
|
||||
|
||||
-- should not fail with mismatch timezone
|
||||
SELECT count(*) FROM (
|
||||
-- SQLNESS REPLACE \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z NOW
|
||||
SELECT
|
||||
now()
|
||||
FROM
|
||||
ngx_access_log);
|
||||
ngx_access_log;
|
||||
|
||||
-- SQLNESS REPLACE TimestampNanosecond\(\d+ TimestampNanosecond(NOW
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE (partitioning.*) REDACTED
|
||||
EXPLAIN SELECT count(*) FROM (
|
||||
SELECT
|
||||
EXPLAIN SELECT
|
||||
now()
|
||||
FROM
|
||||
ngx_access_log);
|
||||
ngx_access_log;
|
||||
|
||||
DROP TABLE ngx_access_log;
|
||||
Reference in New Issue
Block a user