From 25687bb282107278c4100fb7050d86908027816a Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 20 Jan 2026 17:26:53 +0800 Subject: [PATCH] feat: add ddl timeout/wait options, repartition `WITH` parsing, meta-client startup refactor (#7589) * feat: add ddl request timeouts and unify meta client startup Signed-off-by: WenyXu * feat: omplement ALTER TABLE repartition DDL options parsing Signed-off-by: WenyXu * test: add sqlness tests Signed-off-by: WenyXu * test: add unit tests Signed-off-by: WenyXu * feat: pass timeout argument to procedure Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: refine comments Signed-off-by: WenyXu * test: assert timeout Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: update proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/cli/src/metadata/repair.rs | 2 +- src/common/frontend/src/selector.rs | 2 +- src/common/grpc/src/channel_manager.rs | 8 +- src/common/meta/src/ddl_manager.rs | 43 +++- src/common/meta/src/rpc/ddl.rs | 27 ++ src/flow/src/batching_mode/frontend_client.rs | 4 +- src/meta-client/Cargo.toml | 1 + src/meta-client/examples/meta_client.rs | 2 +- src/meta-client/src/client.rs | 142 ++++++----- src/meta-client/src/client/ask_leader.rs | 41 ++- src/meta-client/src/client/cluster.rs | 40 +-- src/meta-client/src/client/heartbeat.rs | 60 ++--- src/meta-client/src/client/procedure.rs | 233 +++++++++++++++--- src/meta-client/src/lib.rs | 12 +- src/meta-srv/src/metasrv/builder.rs | 2 +- src/meta-srv/src/mocks.rs | 2 +- src/meta-srv/src/service/procedure.rs | 5 +- src/operator/Cargo.toml | 1 + src/operator/src/expr_helper.rs | 17 +- src/operator/src/statement/comment.rs | 12 +- src/operator/src/statement/ddl.rs | 186 +++++++++----- src/sql/src/parsers/alter_parser.rs | 52 +++- src/sql/src/parsers/create_parser.rs | 22 +- src/sql/src/parsers/utils.rs | 23 +- src/sql/src/statements/alter.rs | 15 +- src/sql/src/statements/option_map.rs | 6 + src/sql/src/util.rs | 4 + src/table/src/requests.rs | 7 +- tests-integration/src/cluster.rs | 2 +- .../repartition/repartition.result | 35 +++ .../distributed/repartition/repartition.sql | 25 ++ 33 files changed, 716 insertions(+), 322 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5477d40cf8..3d28a03edb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5725,7 +5725,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1353b0ada9e17890c7ba0e402ba29b2b57816ff1#1353b0ada9e17890c7ba0e402ba29b2b57816ff1" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7755821c27295f4ea34a52fab6f993a55d8f9377#7755821c27295f4ea34a52fab6f993a55d8f9377" dependencies = [ "prost 0.14.1", "prost-types 0.14.1", @@ -7772,6 +7772,7 @@ dependencies = [ "meta-srv", "rand 0.9.1", "serde", + "session", "snafu 0.8.6", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 077e22abe0..3f0342f392 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1353b0ada9e17890c7ba0e402ba29b2b57816ff1" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7755821c27295f4ea34a52fab6f993a55d8f9377" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cli/src/metadata/repair.rs b/src/cli/src/metadata/repair.rs index b864ab3857..6012c267c9 100644 --- a/src/cli/src/metadata/repair.rs +++ b/src/cli/src/metadata/repair.rs @@ -96,7 +96,7 @@ impl RepairLogicalTablesCommand { self.validate().map_err(BoxedError::new)?; let kv_backend = self.store.build().await?; let node_client_channel_config = ChannelConfig::new() - .timeout(Duration::from_secs(self.client_timeout_secs)) + .timeout(Some(Duration::from_secs(self.client_timeout_secs))) .connect_timeout(Duration::from_secs(self.client_connect_timeout_secs)); let node_manager = Arc::new(NodeClients::new(node_client_channel_config)); diff --git a/src/common/frontend/src/selector.rs b/src/common/frontend/src/selector.rs index f2dc337cc2..804169d1dd 100644 --- a/src/common/frontend/src/selector.rs +++ b/src/common/frontend/src/selector.rs @@ -103,7 +103,7 @@ impl MetaClientSelector { pub fn new(meta_client: MetaClientRef) -> Self { let cfg = ChannelConfig::new() .connect_timeout(Duration::from_secs(30)) - .timeout(Duration::from_secs(30)); + .timeout(Some(Duration::from_secs(30))); let channel_manager = ChannelManager::with_config(cfg, None); Self { meta_client, diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index a60604da94..1be0e22d1a 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -413,8 +413,8 @@ impl ChannelConfig { } /// A timeout to each request. - pub fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(timeout); + pub fn timeout(mut self, timeout: Option) -> Self { + self.timeout = timeout; self } @@ -660,7 +660,7 @@ mod tests { ); let cfg = default_cfg - .timeout(Duration::from_secs(3)) + .timeout(Some(Duration::from_secs(3))) .connect_timeout(Duration::from_secs(5)) .concurrency_limit(6) .rate_limit(5, Duration::from_secs(1)) @@ -713,7 +713,7 @@ mod tests { #[test] fn test_build_endpoint() { let config = ChannelConfig::new() - .timeout(Duration::from_secs(3)) + .timeout(Some(Duration::from_secs(3))) .connect_timeout(Duration::from_secs(5)) .concurrency_limit(6) .rate_limit(5, Duration::from_secs(1)) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 7edaeb7c10..4104f211f1 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -170,6 +170,25 @@ pub trait RepartitionProcedureFactory: Send + Sync { ) -> std::result::Result<(), BoxedError>; } +/// The options for DDL tasks. +/// +/// Note: These options may not be utilized by all procedures. +/// At present, they are specifically applied in `RepartitionProcedure`. +#[derive(Debug, Clone, Copy)] +pub struct DdlOptions { + /// The timeout will be passed to the procedure. + /// + /// Note: Each procedure may implement its own timeout handling mechanism. + pub timeout: Duration, + /// The flag that controls whether to wait for the procedure to complete. + /// + /// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned. + /// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately. + /// + /// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes. + pub wait: bool, +} + impl DdlManager { /// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered. pub fn try_new( @@ -265,8 +284,9 @@ impl DdlManager { Repartition { from_partition_exprs, into_partition_exprs, - wait, }: Repartition, + wait: bool, + timeout: Duration, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); @@ -278,8 +298,7 @@ impl DdlManager { table_id, from_partition_exprs, into_partition_exprs, - // TODO(weny): get timeout from the proto. - None, + Some(timeout), ) .context(CreateRepartitionProcedureSnafu)?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -298,6 +317,7 @@ impl DdlManager { &self, table_id: TableId, alter_table_task: AlterTableTask, + ddl_options: DdlOptions, ) -> Result<(ProcedureId, Option)> { // make alter_table_task mutable so we can call .take() on its field let mut alter_table_task = alter_table_task; @@ -311,7 +331,13 @@ impl DdlManager { alter_table_task.alter_table.table_name, ); return self - .submit_repartition_task(table_id, table_name, repartition) + .submit_repartition_task( + table_id, + table_name, + repartition, + ddl_options.wait, + ddl_options.timeout, + ) .await; } @@ -566,6 +592,10 @@ impl DdlManager { .map(TracingContext::from_w3c) .unwrap_or_else(TracingContext::from_current_span) .attach(tracing::info_span!("DdlManager::submit_ddl_task")); + let ddl_options = DdlOptions { + wait: request.wait, + timeout: request.timeout, + }; async move { debug!("Submitting Ddl task: {:?}", request.task); match request.task { @@ -574,7 +604,7 @@ impl DdlManager { } DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await, AlterTable(alter_table_task) => { - handle_alter_table_task(self, alter_table_task).await + handle_alter_table_task(self, alter_table_task, ddl_options).await } TruncateTable(truncate_table_task) => { handle_truncate_table_task(self, truncate_table_task).await @@ -660,6 +690,7 @@ async fn handle_truncate_table_task( async fn handle_alter_table_task( ddl_manager: &DdlManager, alter_table_task: AlterTableTask, + ddl_options: DdlOptions, ) -> Result { let table_ref = alter_table_task.table_ref(); @@ -692,7 +723,7 @@ async fn handle_alter_table_task( ); let (id, _) = ddl_manager - .submit_alter_table_task(table_id, alter_table_task) + .submit_alter_table_task(table_id, alter_table_task, ddl_options) .await?; info!("Table: {table_id} is altered via procedure_id {id:?}"); diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 9d37bf1f3c..ddb40ee5ec 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -17,6 +17,7 @@ pub mod trigger; use std::collections::{HashMap, HashSet}; use std::result; +use std::time::Duration; use api::helper::{from_pb_time_ranges, to_pb_time_ranges}; use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind; @@ -293,9 +294,33 @@ impl TryFrom for DdlTask { #[derive(Clone)] pub struct SubmitDdlTaskRequest { pub query_context: QueryContextRef, + pub wait: bool, + pub timeout: Duration, pub task: DdlTask, } +impl SubmitDdlTaskRequest { + /// The default constructor for [`SubmitDdlTaskRequest`]. + pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self { + Self { + query_context, + wait: Self::default_wait(), + timeout: Self::default_timeout(), + task, + } + } + + /// The default timeout for a DDL task. + pub fn default_timeout() -> Duration { + Duration::from_secs(60) + } + + /// The default wait for a DDL task. + pub fn default_wait() -> bool { + true + } +} + impl TryFrom for PbDdlTaskRequest { type Error = error::Error; @@ -346,6 +371,8 @@ impl TryFrom for PbDdlTaskRequest { Ok(Self { header: None, query_context: Some((*request.query_context).clone().into()), + timeout_secs: request.timeout.as_secs() as u32, + wait: request.wait, task: Some(task), }) } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index d07c92efb2..3bf449b1b2 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -149,7 +149,7 @@ impl FrontendClient { chnl_mgr: { let cfg = ChannelConfig::new() .connect_timeout(batch_opts.grpc_conn_timeout) - .timeout(batch_opts.query_timeout); + .timeout(Some(batch_opts.query_timeout)); let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone()) .context(InvalidClientConfigSnafu)?; @@ -550,7 +550,7 @@ mod tests { .is_ok() ); - let meta_client = Arc::new(MetaClient::default()); + let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)); let client = FrontendClient::from_meta_client( meta_client, None, diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 70a3fb581a..cac33a0abb 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -30,6 +30,7 @@ tonic.workspace = true datatypes.workspace = true futures.workspace = true meta-srv = { workspace = true, features = ["mock"] } +session.workspace = true tower.workspace = true tracing = "0.1" tracing-subscriber.workspace = true diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index e1e11d9750..5db13c9658 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -33,7 +33,7 @@ fn main() { async fn run() { let id = 2000u64; let config = ChannelConfig::new() - .timeout(Duration::from_secs(3)) + .timeout(Some(Duration::from_secs(3))) .connect_timeout(Duration::from_secs(5)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config, None); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 0d0bcecee5..00cd09f1f0 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -23,6 +23,7 @@ mod util; use std::fmt::Debug; use std::sync::Arc; +use std::time::Duration; use api::v1::meta::{ MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role, @@ -63,6 +64,7 @@ use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; +use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef}; use crate::error::{ ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu, Result, @@ -73,6 +75,7 @@ pub type Id = u64; const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3; const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3; const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3; +const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { @@ -85,6 +88,8 @@ pub struct MetaClientBuilder { region_follower: Option, channel_manager: Option, ddl_channel_manager: Option, + /// The default ddl timeout for each request. + ddl_timeout: Option, heartbeat_channel_manager: Option, } @@ -165,6 +170,13 @@ impl MetaClientBuilder { } } + pub fn ddl_timeout(self, timeout: Duration) -> Self { + Self { + ddl_timeout: Some(timeout), + ..self + } + } + pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self { Self { heartbeat_channel_manager: Some(channel_manager), @@ -180,62 +192,60 @@ impl MetaClientBuilder { } pub fn build(self) -> MetaClient { - let mut client = if let Some(mgr) = self.channel_manager { - MetaClient::with_channel_manager(self.id, mgr) - } else { - MetaClient::new(self.id) - }; + let mgr = self.channel_manager.unwrap_or_default(); + let heartbeat_channel_manager = self + .heartbeat_channel_manager + .clone() + .unwrap_or_else(|| mgr.clone()); - let mgr = client.channel_manager.clone(); - - if self.enable_heartbeat { + let heartbeat = self.enable_heartbeat.then(|| { if self.heartbeat_channel_manager.is_some() { info!("Enable heartbeat channel using the heartbeat channel manager."); } - let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone()); - client.heartbeat = Some(HeartbeatClient::new( - self.id, - self.role, - mgr, - DEFAULT_ASK_LEADER_MAX_RETRY, - )); - } - if self.enable_store { - client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); - } - - if self.enable_procedure { + HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone()) + }); + let store = self + .enable_store + .then(|| StoreClient::new(self.id, self.role, mgr.clone())); + let procedure = self.enable_procedure.then(|| { let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone()); - client.procedure = Some(ProcedureClient::new( + ProcedureClient::new( self.id, self.role, mgr, DEFAULT_SUBMIT_DDL_MAX_RETRY, - )); - } + self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT), + ) + }); + let cluster = self + .enable_access_cluster_info + .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY)); + let region_follower = self.region_follower.clone(); - if self.enable_access_cluster_info { - client.cluster = Some(ClusterClient::new( + MetaClient { + id: self.id, + channel_manager: mgr.clone(), + leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new( self.id, self.role, - mgr, - DEFAULT_CLUSTER_CLIENT_MAX_RETRY, - )) + DEFAULT_ASK_LEADER_MAX_RETRY, + heartbeat_channel_manager, + )), + heartbeat, + store, + procedure, + cluster, + region_follower, } - - if let Some(region_follower) = self.region_follower { - client.region_follower = Some(region_follower); - } - - client } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct MetaClient { id: Id, channel_manager: ChannelManager, + leader_provider_factory: LeaderProviderFactoryRef, heartbeat: Option, store: Option, procedure: Option, @@ -243,6 +253,26 @@ pub struct MetaClient { region_follower: Option, } +impl MetaClient { + pub fn new(id: Id, role: Role) -> Self { + Self { + id, + channel_manager: ChannelManager::default(), + leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new( + id, + role, + DEFAULT_ASK_LEADER_MAX_RETRY, + ChannelManager::default(), + )), + heartbeat: None, + store: None, + procedure: None, + cluster: None, + region_follower: None, + } + } +} + pub type RegionFollowerClientRef = Arc; /// A trait for clients that can manage region followers. @@ -498,21 +528,6 @@ fn decode_stats(kv: KeyValue) -> MetaResult { } impl MetaClient { - pub fn new(id: Id) -> Self { - Self { - id, - ..Default::default() - } - } - - pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self { - Self { - id, - channel_manager, - ..Default::default() - } - } - pub async fn start(&mut self, urls: A) -> Result<()> where U: AsRef, @@ -520,29 +535,10 @@ impl MetaClient { { info!("MetaClient channel config: {:?}", self.channel_config()); - if let Some(client) = &mut self.region_follower { - let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::>(); - client.start(&urls).await?; - info!("Region follower client started"); - } - if let Some(client) = &mut self.heartbeat { - client.start(urls.clone()).await?; - info!("Heartbeat client started"); - } - if let Some(client) = &mut self.store { - client.start(urls.clone()).await?; - info!("Store client started"); - } - if let Some(client) = &mut self.procedure { - client.start(urls.clone()).await?; - info!("DDL client started"); - } - if let Some(client) = &mut self.cluster { - client.start(urls).await?; - info!("Cluster client started"); - } + let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::>(); + let leader_provider = self.leader_provider_factory.create(&urls); - Ok(()) + self.start_with(leader_provider, urls).await } /// Start the client with a [LeaderProvider] and other Metasrv peers' addresses. diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index e34d0dfedf..9a41559e14 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -34,7 +34,7 @@ use crate::error::Result; pub type LeaderProviderRef = Arc; -/// Provide [MetaClient] a Metasrv leader's address. +/// Provide [`MetaClient`] a Metasrv leader's address. #[async_trait] pub trait LeaderProvider: Debug + Send + Sync { /// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated, @@ -45,6 +45,13 @@ pub trait LeaderProvider: Debug + Send + Sync { async fn ask_leader(&self) -> Result; } +pub type LeaderProviderFactoryRef = Arc; + +/// A factory for creating [`LeaderProvider`] instances. +pub trait LeaderProviderFactory: Send + Sync + Debug { + fn create(&self, peers: &[&str]) -> LeaderProviderRef; +} + #[derive(Debug)] struct LeadershipGroup { leader: Option, @@ -206,3 +213,35 @@ impl LeaderProvider for AskLeader { self.ask_leader().await } } + +/// A factory for creating [`LeaderProvider`] instances. +#[derive(Clone, Debug)] +pub struct LeaderProviderFactoryImpl { + id: Id, + role: Role, + max_retry: usize, + channel_manager: ChannelManager, +} + +impl LeaderProviderFactoryImpl { + pub fn new(id: Id, role: Role, max_retry: usize, channel_manager: ChannelManager) -> Self { + Self { + id, + role, + max_retry, + channel_manager, + } + } +} + +impl LeaderProviderFactory for LeaderProviderFactoryImpl { + fn create(&self, peers: &[&str]) -> LeaderProviderRef { + Arc::new(AskLeader::new( + self.id, + self.role, + peers.iter().map(|p| p.to_string()).collect::>(), + self.channel_manager.clone(), + self.max_retry, + )) + } +} diff --git a/src/meta-client/src/client/cluster.rs b/src/meta-client/src/client/cluster.rs index 5386ee8a57..8da45aaa7a 100644 --- a/src/meta-client/src/client/cluster.rs +++ b/src/meta-client/src/client/cluster.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use api::greptime_proto::v1; use api::v1::meta::cluster_client::ClusterClient; -use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role}; +use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader}; use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelManager; use common_meta::error::{ @@ -37,8 +37,7 @@ use tonic::Status; use tonic::codec::CompressionEncoding; use tonic::transport::Channel; -use crate::client::ask_leader::AskLeader; -use crate::client::{Id, LeaderProviderRef, util}; +use crate::client::{LeaderProviderRef, util}; use crate::error::{ ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu, ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu, @@ -50,10 +49,8 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + pub fn new(channel_manager: ChannelManager, max_retry: usize) -> Self { let inner = Arc::new(RwLock::new(Inner { - id, - role, channel_manager, leader_provider: None, max_retry, @@ -62,15 +59,6 @@ impl Client { Self { inner } } - pub async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let mut inner = self.inner.write().await; - inner.start(urls) - } - /// Start the client with a [LeaderProvider]. pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> { let mut inner = self.inner.write().await; @@ -147,8 +135,6 @@ impl KvBackend for Client { #[derive(Debug)] struct Inner { - id: Id, - role: Role, channel_manager: ChannelManager, leader_provider: Option, max_retry: usize, @@ -166,26 +152,6 @@ impl Inner { Ok(()) } - fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let peers = urls - .as_ref() - .iter() - .map(|url| url.as_ref().to_string()) - .collect::>(); - let ask_leader = AskLeader::new( - self.id, - self.role, - peers, - self.channel_manager.clone(), - self.max_retry, - ); - self.start_with(Arc::new(ask_leader)) - } - fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?; diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 8171289a26..4cdc2cf0f6 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -30,7 +30,6 @@ use tonic::Streaming; use tonic::codec::CompressionEncoding; use tonic::transport::Channel; -use crate::client::ask_leader::AskLeader; use crate::client::{Id, LeaderProviderRef}; use crate::error; use crate::error::{InvalidResponseHeaderSnafu, Result}; @@ -149,25 +148,11 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { - let inner = Arc::new(RwLock::new(Inner::new( - id, - role, - channel_manager, - max_retry, - ))); + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager))); Self { inner } } - pub async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let mut inner = self.inner.write().await; - inner.start(urls) - } - /// Start the client with a [LeaderProvider]. pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> { let mut inner = self.inner.write().await; @@ -194,17 +179,15 @@ struct Inner { role: Role, channel_manager: ChannelManager, leader_provider: Option, - max_retry: usize, } impl Inner { - fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { Self { id, role, channel_manager, leader_provider: None, - max_retry, } } @@ -219,26 +202,6 @@ impl Inner { Ok(()) } - fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let peers = urls - .as_ref() - .iter() - .map(|url| url.as_ref().to_string()) - .collect::>(); - let ask_leader = AskLeader::new( - self.id, - self.role, - peers, - self.channel_manager.clone(), - self.max_retry, - ); - self.start_with(Arc::new(ask_leader)) - } - async fn ask_leader(&self) -> Result { let Some(leader_provider) = self.leader_provider.as_ref() else { return error::IllegalGrpcClientStateSnafu { @@ -332,15 +295,20 @@ impl Inner { #[cfg(test)] mod test { use super::*; + use crate::client::AskLeader; #[tokio::test] async fn test_already_start() { - let mut client = Client::new(0, Role::Datanode, ChannelManager::default(), 3); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) - .await - .unwrap(); - let res = client.start(&["127.0.0.1:1002"]).await; + let client = Client::new(0, Role::Datanode, ChannelManager::default()); + let leader_provider = Arc::new(AskLeader::new( + 0, + Role::Datanode, + vec!["127.0.0.1:1000".to_string(), "127.0.0.1:1001".to_string()], + ChannelManager::default(), + 3, + )); + client.start_with(leader_provider.clone()).await.unwrap(); + let res = client.start_with(leader_provider).await; assert!(res.is_err()); assert!(matches!( res.err(), diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index 7b05b188aa..0a4c6bde88 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -27,11 +27,10 @@ use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; use snafu::{ResultExt, ensure}; use tokio::sync::RwLock; -use tonic::Status; use tonic::codec::CompressionEncoding; use tonic::transport::Channel; +use tonic::{Request, Status}; -use crate::client::ask_leader::AskLeader; use crate::client::{Id, LeaderProviderRef, util}; use crate::error; use crate::error::Result; @@ -42,27 +41,25 @@ pub struct Client { } impl Client { - pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self { + pub fn new( + id: Id, + role: Role, + channel_manager: ChannelManager, + max_retry: usize, + timeout: Duration, + ) -> Self { let inner = Arc::new(RwLock::new(Inner { id, role, channel_manager, leader_provider: None, max_retry, + timeout, })); Self { inner } } - pub async fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let mut inner = self.inner.write().await; - inner.start(urls) - } - /// Start the client with a [LeaderProvider]. pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> { let mut inner = self.inner.write().await; @@ -117,6 +114,8 @@ struct Inner { channel_manager: ChannelManager, leader_provider: Option, max_retry: usize, + /// Request timeout. + timeout: Duration, } impl Inner { @@ -131,26 +130,6 @@ impl Inner { Ok(()) } - fn start(&mut self, urls: A) -> Result<()> - where - U: AsRef, - A: AsRef<[U]>, - { - let peers = urls - .as_ref() - .iter() - .map(|url| url.as_ref().to_string()) - .collect::>(); - let ask_leader = AskLeader::new( - self.id, - self.role, - peers, - self.channel_manager.clone(), - self.max_retry, - ); - self.start_with(Arc::new(ask_leader)) - } - fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self .channel_manager @@ -209,7 +188,7 @@ impl Inner { times += 1; continue; } else { - error!("An error occurred in gRPC, status: {status}"); + error!("An error occurred in gRPC, status: {status:?}"); return Err(error::Error::from(status)); } } @@ -250,7 +229,8 @@ impl Inner { self.with_retry( "migrate region", move |mut client| { - let req = req.clone(); + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); async move { client.migrate(req).await.map(|res| res.into_inner()) } }, @@ -270,7 +250,8 @@ impl Inner { self.with_retry( "reconcile", move |mut client| { - let req = req.clone(); + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); async move { client.reconcile(req).await.map(|res| res.into_inner()) } }, @@ -294,7 +275,8 @@ impl Inner { self.with_retry( "query procedure state", move |mut client| { - let req = req.clone(); + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); async move { client.query(req).await.map(|res| res.into_inner()) } }, @@ -309,11 +291,13 @@ impl Inner { self.role, TracingContext::from_current_span().to_w3c(), ); + let timeout = Duration::from_secs(req.timeout_secs.into()); self.with_retry( "submit ddl task", move |mut client| { - let req = req.clone(); + let mut req = Request::new(req.clone()); + req.set_timeout(timeout); async move { client.ddl(req).await.map(|res| res.into_inner()) } }, |resp: &DdlTaskResponse| &resp.header, @@ -332,7 +316,8 @@ impl Inner { self.with_retry( "list procedure", move |mut client| { - let req = req.clone(); + let mut req = Request::new(req.clone()); + req.set_timeout(self.timeout); async move { client.details(req).await.map(|res| res.into_inner()) } }, |resp: &ProcedureDetailResponse| &resp.header, @@ -340,3 +325,175 @@ impl Inner { .await } } + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer}; + use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer}; + use api::v1::meta::{ + AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest, + HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer, + ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, + QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, + }; + use async_trait::async_trait; + use common_error::status_code::StatusCode; + use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest}; + use common_telemetry::common_error::ext::ErrorExt; + use common_telemetry::info; + use session::context::QueryContext; + use tokio::net::TcpListener; + use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; + use tonic::codec::CompressionEncoding; + use tonic::{Request, Response, Status}; + + use crate::client::MetaClientBuilder; + + #[derive(Clone)] + struct MockHeartbeat { + leader_addr: String, + } + + #[async_trait] + impl Heartbeat for MockHeartbeat { + type HeartbeatStream = ReceiverStream>; + + async fn heartbeat( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "heartbeat stream is not used in this test", + )) + } + + async fn ask_leader( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(AskLeaderResponse { + header: Some(ResponseHeader { + protocol_version: 0, + error: None, + }), + leader: Some(Peer { + id: 1, + addr: self.leader_addr.clone(), + }), + })) + } + } + + #[derive(Clone)] + struct MockProcedure { + delay: Duration, + } + + #[async_trait] + impl ProcedureService for MockProcedure { + async fn query( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("query is not used in this test")) + } + + async fn ddl( + &self, + _request: Request, + ) -> Result, Status> { + tokio::time::sleep(self.delay).await; + Ok(Response::new(DdlTaskResponse { + header: Some(ResponseHeader { + protocol_version: 0, + error: None, + }), + ..Default::default() + })) + } + + async fn reconcile( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("reconcile is not used in this test")) + } + + async fn migrate( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("migrate is not used in this test")) + } + + async fn details( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("details is not used in this test")) + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_meta_client_ddl_request_timeout() { + common_telemetry::init_default_ut_logging(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let addr_str = addr.to_string(); + + let heartbeat = MockHeartbeat { + leader_addr: addr_str.clone(), + }; + let procedure = MockProcedure { + delay: Duration::from_secs(4), + }; + + let server = tonic::transport::Server::builder() + .add_service( + HeartbeatServer::new(heartbeat).accept_compressed(CompressionEncoding::Zstd), + ) + .add_service( + ProcedureServiceServer::new(procedure).accept_compressed(CompressionEncoding::Zstd), + ) + .serve_with_incoming(TcpListenerStream::new(listener)); + let server_handle = tokio::spawn(server); + + let mut client = MetaClientBuilder::new(0, Role::Frontend) + .enable_heartbeat() + .enable_procedure() + .build(); + client.start(&[addr_str.as_str()]).await.unwrap(); + + let mut request = SubmitDdlTaskRequest::new( + QueryContext::arc(), + DdlTask::new_comment_on(CommentOnTask { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + object_type: CommentObjectType::Table, + object_name: "test_table".to_string(), + column_name: None, + object_id: None, + comment: Some("timeout".to_string()), + }), + ); + request.timeout = Duration::from_secs(1); + + let now = Instant::now(); + let err = client.submit_ddl_task(request).await.unwrap_err(); + let elapsed = now.elapsed(); + // The request should be cancelled within 1 second. + assert!(elapsed < Duration::from_secs(2)); + info!("err: {err:?}, code: {}", err.status_code()); + assert_eq!(err.status_code(), StatusCode::Cancelled); + let err_msg = err.to_string(); + assert!( + err_msg.contains("Timeout expired"), + "unexpected error: {err_msg}" + ); + + server_handle.abort(); + } +} diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 715154a8e5..91c3e8e0b2 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -95,18 +95,22 @@ pub async fn create_meta_client( }; let base_config = ChannelConfig::new() - .timeout(meta_client_options.timeout) + .timeout(Some(meta_client_options.timeout)) .connect_timeout(meta_client_options.connect_timeout) .tcp_nodelay(meta_client_options.tcp_nodelay); let heartbeat_config = base_config .clone() - .timeout(HEARTBEAT_TIMEOUT) + .timeout(Some(HEARTBEAT_TIMEOUT)) .http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS) .http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS); if let MetaClientType::Frontend = client_type { - let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout); - builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config, None)); + // Unset the timeout in the DDL channel manager, + // delegating timeout control to each individual request rather than the channel manager itself. + let ddl_config = base_config.clone().timeout(None); + builder = builder + .ddl_timeout(meta_client_options.ddl_timeout) + .ddl_channel_manager(ChannelManager::with_config(ddl_config, None)); if let Some(plugins) = plugins { let region_follower = plugins.get::(); if let Some(region_follower) = region_follower { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8881044c34..58039d2de4 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -294,7 +294,7 @@ impl MetasrvBuilder { let memory_region_keeper = Arc::new(MemoryRegionKeeper::default()); let node_manager = node_manager.unwrap_or_else(|| { let datanode_client_channel_config = ChannelConfig::new() - .timeout(options.datanode.client.timeout) + .timeout(Some(options.datanode.client.timeout)) .connect_timeout(options.datanode.client.connect_timeout) .tcp_nodelay(options.datanode.client.tcp_nodelay); Arc::new(NodeClients::new(datanode_client_channel_config)) diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index c805f8ea1b..08cb32e11a 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -131,7 +131,7 @@ pub async fn mock( }); let config = ChannelConfig::new() - .timeout(Duration::from_secs(10)) + .timeout(Some(Duration::from_secs(10))) .connect_timeout(Duration::from_secs(10)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config, None); diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index dde3fb6a37..20dd0ed1ae 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -75,7 +75,8 @@ impl procedure_service_server::ProcedureService for Metasrv { header, query_context, task, - .. + wait, + timeout_secs, } = request.into_inner(); let header = header.context(error::MissingRequestHeaderSnafu)?; @@ -97,6 +98,8 @@ impl procedure_service_server::ProcedureService for Metasrv { }, SubmitDdlTaskRequest { query_context: Arc::new(query_context), + wait, + timeout: Duration::from_secs(timeout_secs.into()), task, }, ) diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 82ddb12e20..f3de3a9cd7 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -76,6 +76,7 @@ tonic.workspace = true tracing.workspace = true [dev-dependencies] +catalog = { workspace = true, features = ["testing"] } common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true path-slash = "0.2" diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index ff3d00eb0a..d8a8d54d43 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -695,18 +695,24 @@ pub struct RepartitionRequest { pub table_name: String, pub from_exprs: Vec, pub into_exprs: Vec, + pub options: OptionMap, } pub(crate) fn to_repartition_request( alter_table: AlterTable, query_ctx: &QueryContextRef, ) -> Result { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(alter_table.table_name(), query_ctx) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + let AlterTable { + table_name, + alter_operation, + options, + } = alter_table; - let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else { + let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let AlterTableOperation::Repartition { operation } = alter_operation else { return InvalidSqlSnafu { err_msg: "expected REPARTITION operation", } @@ -719,6 +725,7 @@ pub(crate) fn to_repartition_request( table_name, from_exprs: operation.from_exprs, into_exprs: operation.into_exprs, + options, }) } diff --git a/src/operator/src/statement/comment.rs b/src/operator/src/statement/comment.rs index d82d059ad9..bf894a50c8 100644 --- a/src/operator/src/statement/comment.rs +++ b/src/operator/src/statement/comment.rs @@ -40,10 +40,8 @@ impl StatementExecutor { pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result { let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?; - let request = SubmitDdlTaskRequest { - task: DdlTask::new_comment_on(comment_on_task), - query_context: query_ctx, - }; + let request = + SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -59,10 +57,8 @@ impl StatementExecutor { ) -> Result { let comment_on_task = self.create_comment_on_task_from_expr(expr)?; - let request = SubmitDdlTaskRequest { - task: DdlTask::new_comment_on(comment_on_task), - query_context: query_ctx, - }; + let request = + SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index f0cb696a97..13812c10f8 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_table_expr::Kind; @@ -46,6 +47,7 @@ use common_meta::rpc::ddl::{ SubmitDdlTaskResponse, }; use common_query::Output; +use common_recordbatch::{RecordBatch, RecordBatches}; use common_sql::convert::sql_value_to_value; use common_telemetry::{debug, info, tracing, warn}; use common_time::{Timestamp, Timezone}; @@ -54,6 +56,8 @@ use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema}; use datatypes::value::Value; +use datatypes::vectors::{StringVector, VectorRef}; +use humantime::parse_duration; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::multi_dim::MultiDimPartitionRule; use query::parser::QueryStatement; @@ -64,6 +68,7 @@ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt, ensure}; use sql::parser::{ParseOptions, ParserContext}; +use sql::statements::OptionMap; #[cfg(feature = "enterprise")] use sql::statements::alter::trigger::AlterTrigger; use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation}; @@ -79,7 +84,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::TableRef; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; -use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions}; +use table::requests::{ + AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions, +}; use table::table_name::TableName; use table::table_reference::TableReference; @@ -98,6 +105,51 @@ use crate::expr_helper::{self, RepartitionRequest}; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; +#[derive(Debug, Clone, Copy)] +struct DdlSubmitOptions { + wait: bool, + timeout: Duration, +} + +fn build_procedure_id_output(procedure_id: Vec) -> Result { + let procedure_id = String::from_utf8_lossy(&procedure_id).to_string(); + let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id])); + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "Procedure ID", + vector.data_type(), + false, + )])); + let batch = + RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?; + let batches = + RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?; + Ok(Output::new_with_record_batches(batches)) +} + +fn parse_ddl_options(options: &OptionMap) -> Result { + let wait = match options.get(DDL_WAIT) { + Some(value) => value.parse::().map_err(|_| { + InvalidSqlSnafu { + err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"), + } + .build() + })?, + None => SubmitDdlTaskRequest::default_wait(), + }; + + let timeout = match options.get(DDL_TIMEOUT) { + Some(value) => parse_duration(value).map_err(|err| { + InvalidSqlSnafu { + err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"), + } + .build() + })?, + None => SubmitDdlTaskRequest::default_timeout(), + }; + + Ok(DdlSubmitOptions { wait, timeout }) +} + impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() @@ -505,10 +557,7 @@ impl StatementExecutor { }) .context(error::InvalidExprSnafu)?; - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_create_trigger(task), - }; + let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -558,10 +607,7 @@ impl StatementExecutor { create_flow: Some(expr), }) .context(error::InvalidExprSnafu)?; - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_create_flow(task), - }; + let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -856,10 +902,8 @@ impl StatementExecutor { table_type: TableType::View, }; - let request = SubmitDdlTaskRequest { - query_context: ctx, - task: DdlTask::new_create_view(expr, view_info.clone()), - }; + let request = + SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone())); let resp = self .procedure_executor @@ -942,10 +986,7 @@ impl StatementExecutor { expr: DropFlowTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_drop_flow(expr), - }; + let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -977,10 +1018,7 @@ impl StatementExecutor { expr: DropTriggerTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_drop_trigger(expr), - }; + let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1045,10 +1083,7 @@ impl StatementExecutor { expr: DropViewTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_drop_view(expr), - }; + let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1418,6 +1453,7 @@ impl StatementExecutor { new_partition_exprs_len ); + let ddl_options = parse_ddl_options(&request.options)?; let serialize_exprs = |exprs: Vec| -> Result> { let mut json_exprs = Vec::with_capacity(exprs.len()); for expr in exprs { @@ -1427,20 +1463,32 @@ impl StatementExecutor { }; let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?; let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?; - let req = SubmitDdlTaskRequest { - query_context: query_context.clone(), - task: DdlTask::new_alter_table(AlterTableExpr { + let mut req = SubmitDdlTaskRequest::new( + query_context.clone(), + DdlTask::new_alter_table(AlterTableExpr { catalog_name: request.catalog_name.clone(), schema_name: request.schema_name.clone(), table_name: request.table_name.clone(), kind: Some(Kind::Repartition(Repartition { from_partition_exprs: from_partition_exprs_json, into_partition_exprs: into_partition_exprs_json, - // TODO(weny): allow passing 'wait' from SQL options or QueryContext - wait: true, })), }), - }; + ); + req.wait = ddl_options.wait; + req.timeout = ddl_options.timeout; + + let response = self + .procedure_executor + .submit_ddl_task(&ExecutorContext::default(), req) + .await + .context(error::ExecuteDdlSnafu)?; + + if !ddl_options.wait { + return build_procedure_id_output(response.key); + } + + // Only invalidate cache if wait is true. let invalidate_keys = vec![ CacheIdent::TableId(table_id), CacheIdent::TableName(TableName::new( @@ -1449,10 +1497,6 @@ impl StatementExecutor { request.table_name, )), ]; - self.procedure_executor - .submit_ddl_task(&ExecutorContext::default(), req) - .await - .context(error::ExecuteDdlSnafu)?; // Invalidates local cache ASAP. self.cache_invalidator @@ -1524,10 +1568,7 @@ impl StatementExecutor { let (req, invalidate_keys) = if physical_table_id == table_id { // This is physical table - let req = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_alter_table(expr), - }; + let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr)); let invalidate_keys = vec![ CacheIdent::TableId(table_id), @@ -1537,10 +1578,10 @@ impl StatementExecutor { (req, invalidate_keys) } else { // This is logical table - let req = SubmitDdlTaskRequest { + let req = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_alter_logical_tables(vec![expr]), - }; + DdlTask::new_alter_logical_tables(vec![expr]), + ); let mut invalidate_keys = vec![ CacheIdent::TableId(physical_table_id), @@ -1658,10 +1699,10 @@ impl StatementExecutor { .map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu)) .collect::>>()?; - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_create_table(create_table, partitions, table_info), - }; + DdlTask::new_create_table(create_table, partitions, table_info), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1674,10 +1715,10 @@ impl StatementExecutor { tables_data: Vec<(CreateTableExpr, RawTableInfo)>, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_create_logical_tables(tables_data), - }; + DdlTask::new_create_logical_tables(tables_data), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1690,10 +1731,10 @@ impl StatementExecutor { tables_data: Vec, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_alter_logical_tables(tables_data), - }; + DdlTask::new_alter_logical_tables(tables_data), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1708,16 +1749,16 @@ impl StatementExecutor { drop_if_exists: bool, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_drop_table( + DdlTask::new_drop_table( table_name.catalog_name.clone(), table_name.schema_name.clone(), table_name.table_name.clone(), table_id, drop_if_exists, ), - }; + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1732,10 +1773,10 @@ impl StatementExecutor { drop_if_exists: bool, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_drop_database(catalog, schema, drop_if_exists), - }; + DdlTask::new_drop_database(catalog, schema, drop_if_exists), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1748,10 +1789,8 @@ impl StatementExecutor { alter_expr: AlterDatabaseExpr, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { - query_context, - task: DdlTask::new_alter_database(alter_expr), - }; + let request = + SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr)); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1766,16 +1805,16 @@ impl StatementExecutor { time_ranges: Vec<(Timestamp, Timestamp)>, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_truncate_table( + DdlTask::new_truncate_table( table_name.catalog_name.clone(), table_name.schema_name.clone(), table_name.table_name.clone(), table_id, time_ranges, ), - }; + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1838,10 +1877,10 @@ impl StatementExecutor { options: HashMap, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest { + let request = SubmitDdlTaskRequest::new( query_context, - task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options), - }; + DdlTask::new_create_database(catalog, database, create_if_not_exists, options), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -2204,6 +2243,8 @@ fn convert_value( #[cfg(test)] mod test { + use std::time::Duration; + use session::context::{QueryContext, QueryContextBuilder}; use sql::dialect::GreptimeDbDialect; use sql::parser::{ParseOptions, ParserContext}; @@ -2213,6 +2254,17 @@ mod test { use super::*; use crate::expr_helper; + #[test] + fn test_parse_ddl_options() { + let options = OptionMap::from([ + ("timeout".to_string(), "5m".to_string()), + ("wait".to_string(), "false".to_string()), + ]); + let ddl_options = parse_ddl_options(&options).unwrap(); + assert!(!ddl_options.wait); + assert_eq!(Duration::from_secs(300), ddl_options.timeout); + } + #[test] fn test_name_is_match() { assert!(!NAME_PATTERN_REG.is_match("/adaf")); diff --git a/src/sql/src/parsers/alter_parser.rs b/src/sql/src/parsers/alter_parser.rs index a71aebf4b5..df89f90922 100644 --- a/src/sql/src/parsers/alter_parser.rs +++ b/src/sql/src/parsers/alter_parser.rs @@ -30,7 +30,8 @@ use crate::error::{self, InvalidColumnOptionSnafu, Result, SetFulltextOptionSnaf use crate::parser::ParserContext; use crate::parsers::create_parser::INVERTED; use crate::parsers::utils::{ - validate_column_fulltext_create_option, validate_column_skipping_index_create_option, + parse_with_options, validate_column_fulltext_create_option, + validate_column_skipping_index_create_option, }; use crate::statements::OptionMap; use crate::statements::alter::{ @@ -181,7 +182,10 @@ impl ParserContext<'_> { } unexpected => self.unsupported(unexpected.to_string())?, }; - Ok(AlterTable::new(table_name, alter_operation)) + + let options = parse_with_options(&mut self.parser)?; + // TODO(weny): Respect the DDL options (e.g., WAIT and TIMEOUT) after the ALTER TABLE statement. + Ok(AlterTable::new(table_name, alter_operation, options)) } fn parse_alter_table_unset(&mut self) -> Result { @@ -1043,6 +1047,50 @@ ALTER TABLE metrics MERGE PARTITION ( } } + #[test] + fn test_parse_alter_table_merge_partition_with_options() { + let sql = r#" +ALTER TABLE alter_repartition_table MERGE PARTITION ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +) WITH ( + TIMEOUT = '5m', + WAIT = false +);"#; + let mut result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, result.len()); + + let statement = result.remove(0); + assert_matches!(statement, Statement::AlterTable { .. }); + if let Statement::AlterTable(alter_table) = statement { + assert_matches!( + alter_table.alter_operation(), + AlterTableOperation::Repartition { .. } + ); + + if let AlterTableOperation::Repartition { operation } = alter_table.alter_operation() { + assert_eq!(operation.from_exprs.len(), 2); + assert_eq!( + operation.from_exprs[0].to_string(), + "device_id < 100 AND area < 'South'" + ); + assert_eq!( + operation.from_exprs[1].to_string(), + "device_id < 100 AND area >= 'South'" + ); + assert_eq!(operation.into_exprs.len(), 1); + } + + // Verify WITH options are parsed + let options = alter_table.options().to_str_map(); + assert_eq!(options.get("timeout").unwrap(), &"5m"); + assert_eq!(options.get("wait").unwrap(), &"false"); + assert_eq!(options.len(), 2); + } + } + #[test] fn test_parse_alter_table_repartition_multiple() { let sql = r#" diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index fe68a07669..33c3bf506f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -31,19 +31,19 @@ use sqlparser::keywords::ALL_KEYWORDS; use sqlparser::parser::IsOptional::Mandatory; use sqlparser::parser::{Parser, ParserError}; use sqlparser::tokenizer::{Token, TokenWithSpan, Word}; -use table::requests::{validate_database_option, validate_table_option}; +use table::requests::validate_database_option; use crate::ast::{ColumnDef, Ident, ObjectNamePartExt}; use crate::error::{ self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu, - InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, - SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu, + InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu, + UnexpectedSnafu, UnsupportedSnafu, }; use crate::parser::{FLOW, ParserContext}; use crate::parsers::tql_parser; use crate::parsers::utils::{ - self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option, - validate_column_vector_index_create_option, + self, parse_with_options, validate_column_fulltext_create_option, + validate_column_skipping_index_create_option, validate_column_vector_index_create_option, }; use crate::statements::create::{ Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, @@ -447,17 +447,7 @@ impl<'a> ParserContext<'a> { } fn parse_create_table_options(&mut self) -> Result { - let options = self - .parser - .parse_options(Keyword::WITH) - .context(SyntaxSnafu)? - .into_iter() - .map(parse_option_string) - .collect::>>()?; - for key in options.keys() { - ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); - } - Ok(OptionMap::new(options)) + parse_with_options(&mut self.parser) } /// "PARTITION ON COLUMNS (...)" clause diff --git a/src/sql/src/parsers/utils.rs b/src/sql/src/parsers/utils.rs index 679466da2f..b6e4c44f93 100644 --- a/src/sql/src/parsers/utils.rs +++ b/src/sql/src/parsers/utils.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use chrono::Utc; @@ -39,13 +40,18 @@ use datatypes::schema::{ }; use snafu::{ResultExt, ensure}; use sqlparser::dialect::Dialect; +use sqlparser::keywords::Keyword; +use sqlparser::parser::Parser; +use table::requests::validate_table_option; use crate::error::{ - ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, ParseSqlValueSnafu, Result, - SimplificationSnafu, + ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, InvalidTableOptionSnafu, ParseSqlValueSnafu, + Result, SimplificationSnafu, SyntaxSnafu, }; use crate::parser::{ParseOptions, ParserContext}; +use crate::statements::OptionMap; use crate::statements::statement::Statement; +use crate::util::{OptionValue, parse_option_string}; /// Check if the given SQL query is a TQL statement. pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result { @@ -272,6 +278,19 @@ pub fn convert_month_day_nano_to_duration( Ok(std::time::Duration::new(adjusted_seconds, nanos_remainder)) } +pub fn parse_with_options(parser: &mut Parser) -> Result { + let options = parser + .parse_options(Keyword::WITH) + .context(SyntaxSnafu)? + .into_iter() + .map(parse_option_string) + .collect::>>()?; + for key in options.keys() { + ensure!(validate_table_option(key), InvalidTableOptionSnafu { key }); + } + Ok(OptionMap::new(options)) +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/sql/src/statements/alter.rs b/src/sql/src/statements/alter.rs index ae5bce08c8..5b6a5ab5e6 100644 --- a/src/sql/src/statements/alter.rs +++ b/src/sql/src/statements/alter.rs @@ -25,17 +25,26 @@ use serde::Serialize; use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, ObjectName, TableConstraint}; use sqlparser_derive::{Visit, VisitMut}; +use crate::statements::OptionMap; + #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] pub struct AlterTable { pub table_name: ObjectName, pub alter_operation: AlterTableOperation, + /// Table options in `WITH`. All keys are lowercase. + pub options: OptionMap, } impl AlterTable { - pub(crate) fn new(table_name: ObjectName, alter_operation: AlterTableOperation) -> Self { + pub(crate) fn new( + table_name: ObjectName, + alter_operation: AlterTableOperation, + options: OptionMap, + ) -> Self { Self { table_name, alter_operation, + options, } } @@ -47,6 +56,10 @@ impl AlterTable { &self.alter_operation } + pub fn options(&self) -> &OptionMap { + &self.options + } + pub fn alter_operation_mut(&mut self) -> &mut AlterTableOperation { &mut self.alter_operation } diff --git a/src/sql/src/statements/option_map.rs b/src/sql/src/statements/option_map.rs index d6bd4d7608..61f8badd0a 100644 --- a/src/sql/src/statements/option_map.rs +++ b/src/sql/src/statements/option_map.rs @@ -90,6 +90,9 @@ impl OptionMap { self.options.len() + self.secrets.len() } + /// Convert the option map to a string map. + /// + /// Notes: Not all values can be converted to a string, refer to [OptionValue::expr_as_string] for more details. pub fn to_str_map(&self) -> HashMap<&str, &str> { let mut map = HashMap::with_capacity(self.len()); map.extend( @@ -105,6 +108,9 @@ impl OptionMap { map } + /// Convert the option map to a string map. + /// + /// Notes: Not all values can be converted to a string, refer to [OptionValue::expr_as_string] for more details. pub fn into_map(self) -> HashMap { let mut map = HashMap::with_capacity(self.len()); map.extend( diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 8cdf9ee800..4134c8bbf6 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -87,6 +87,7 @@ impl OptionValue { | Value::HexStringLiteral(s) => Some(s), Value::DollarQuotedString(s) => Some(&s.value), Value::Number(s, _) => Some(s), + Value::Boolean(b) => Some(if *b { "true" } else { "false" }), _ => None, }, Expr::Identifier(ident) => Some(&ident.value), @@ -94,6 +95,9 @@ impl OptionValue { } } + /// Convert the option value to a string. + /// + /// Notes: Not all values can be converted to a string, refer to [Self::expr_as_string] for more details. pub fn as_string(&self) -> Option<&str> { Self::expr_as_string(&self.0) } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 3b4058e083..43fc36644b 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -75,6 +75,11 @@ pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [ OTLP_METRIC_COMPAT_KEY, ]; +pub const DDL_TIMEOUT: &str = "timeout"; +pub const DDL_WAIT: &str = "wait"; + +pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT]; + // Valid option keys when creating a db. static VALID_DB_OPT_KEYS: Lazy> = Lazy::new(|| { let mut set = HashSet::new(); @@ -115,7 +120,7 @@ pub fn validate_table_option(key: &str) -> bool { return true; } - VALID_TABLE_OPTION_KEYS.contains(&key) + VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key) } #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 282ab110e7..7903699453 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -200,7 +200,7 @@ impl GreptimeDbClusterBuilder { guards: Vec, ) -> GreptimeDbCluster { let datanodes = datanode_options.len(); - let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20)); + let channel_config = ChannelConfig::new().timeout(Some(Duration::from_secs(20))); let datanode_clients = Arc::new(NodeClients::new(channel_config)); let opt = MetasrvOptions { diff --git a/tests/cases/distributed/repartition/repartition.result b/tests/cases/distributed/repartition/repartition.result index 0df60a0ac2..bf54a632b4 100644 --- a/tests/cases/distributed/repartition/repartition.result +++ b/tests/cases/distributed/repartition/repartition.result @@ -99,6 +99,41 @@ DROP TABLE alter_repartition_table; Affected Rows: 0 +CREATE TABLE alter_repartition_table_with_options( + device_id INT, + area STRING, + ty STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id, area) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +Affected Rows: 0 + +-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-4[0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}) PROC_ID +ALTER TABLE alter_repartition_table_with_options REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +) WITH ( + TIMEOUT = '5m', + WAIT = false +); + ++--------------------------------------+ +| Procedure ID | ++--------------------------------------+ +| PROC_ID | ++--------------------------------------+ + +DROP TABLE alter_repartition_table_with_options; + +Affected Rows: 0 + -- Metric engine repartition test CREATE TABLE metric_physical_table ( ts TIMESTAMP TIME INDEX, diff --git a/tests/cases/distributed/repartition/repartition.sql b/tests/cases/distributed/repartition/repartition.sql index d2051a9a15..92b1d6ad97 100644 --- a/tests/cases/distributed/repartition/repartition.sql +++ b/tests/cases/distributed/repartition/repartition.sql @@ -47,6 +47,31 @@ ALTER TABLE alter_repartition_table REPARTITION ( DROP TABLE alter_repartition_table; +CREATE TABLE alter_repartition_table_with_options( + device_id INT, + area STRING, + ty STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(device_id) +) PARTITION ON COLUMNS (device_id, area) ( + device_id < 100, + device_id >= 100 AND device_id < 200, + device_id >= 200 +); + +-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-4[0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}) PROC_ID +ALTER TABLE alter_repartition_table_with_options REPARTITION ( + device_id < 100 +) INTO ( + device_id < 100 AND area < 'South', + device_id < 100 AND area >= 'South' +) WITH ( + TIMEOUT = '5m', + WAIT = false +); + +DROP TABLE alter_repartition_table_with_options; + -- Metric engine repartition test CREATE TABLE metric_physical_table ( ts TIMESTAMP TIME INDEX,