feat: add ddl timeout/wait options, repartition WITH parsing, meta-client startup refactor (#7589)

* feat: add ddl request timeouts and unify meta client startup

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: omplement ALTER TABLE repartition DDL options parsing

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add sqlness tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: pass timeout argument to procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: assert timeout

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-01-20 17:26:53 +08:00
committed by GitHub
parent aa3daf7053
commit 25687bb282
33 changed files with 716 additions and 322 deletions

3
Cargo.lock generated
View File

@@ -5725,7 +5725,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1353b0ada9e17890c7ba0e402ba29b2b57816ff1#1353b0ada9e17890c7ba0e402ba29b2b57816ff1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=7755821c27295f4ea34a52fab6f993a55d8f9377#7755821c27295f4ea34a52fab6f993a55d8f9377"
dependencies = [
"prost 0.14.1",
"prost-types 0.14.1",
@@ -7772,6 +7772,7 @@ dependencies = [
"meta-srv",
"rand 0.9.1",
"serde",
"session",
"snafu 0.8.6",
"tokio",
"tokio-stream",

View File

@@ -152,7 +152,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1353b0ada9e17890c7ba0e402ba29b2b57816ff1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "7755821c27295f4ea34a52fab6f993a55d8f9377" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -96,7 +96,7 @@ impl RepairLogicalTablesCommand {
self.validate().map_err(BoxedError::new)?;
let kv_backend = self.store.build().await?;
let node_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_secs(self.client_timeout_secs))
.timeout(Some(Duration::from_secs(self.client_timeout_secs)))
.connect_timeout(Duration::from_secs(self.client_connect_timeout_secs));
let node_manager = Arc::new(NodeClients::new(node_client_channel_config));

View File

@@ -103,7 +103,7 @@ impl MetaClientSelector {
pub fn new(meta_client: MetaClientRef) -> Self {
let cfg = ChannelConfig::new()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30));
.timeout(Some(Duration::from_secs(30)));
let channel_manager = ChannelManager::with_config(cfg, None);
Self {
meta_client,

View File

@@ -413,8 +413,8 @@ impl ChannelConfig {
}
/// A timeout to each request.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
@@ -660,7 +660,7 @@ mod tests {
);
let cfg = default_cfg
.timeout(Duration::from_secs(3))
.timeout(Some(Duration::from_secs(3)))
.connect_timeout(Duration::from_secs(5))
.concurrency_limit(6)
.rate_limit(5, Duration::from_secs(1))
@@ -713,7 +713,7 @@ mod tests {
#[test]
fn test_build_endpoint() {
let config = ChannelConfig::new()
.timeout(Duration::from_secs(3))
.timeout(Some(Duration::from_secs(3)))
.connect_timeout(Duration::from_secs(5))
.concurrency_limit(6)
.rate_limit(5, Duration::from_secs(1))

View File

@@ -170,6 +170,25 @@ pub trait RepartitionProcedureFactory: Send + Sync {
) -> std::result::Result<(), BoxedError>;
}
/// The options for DDL tasks.
///
/// Note: These options may not be utilized by all procedures.
/// At present, they are specifically applied in `RepartitionProcedure`.
#[derive(Debug, Clone, Copy)]
pub struct DdlOptions {
/// The timeout will be passed to the procedure.
///
/// Note: Each procedure may implement its own timeout handling mechanism.
pub timeout: Duration,
/// The flag that controls whether to wait for the procedure to complete.
///
/// If wait is `true`, the procedure will wait for completion(success or failure) and the result will be returned.
/// Otherwise, the procedure will be submitted and return the [ProcedureId](common_procedure::ProcedureId) immediately.
///
/// Note: The value of `wait` is independent of the `timeout` option. If a procedure ignores the `timeout` and `wait` is set to true, the operation returns until the procedure completes.
pub wait: bool,
}
impl DdlManager {
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
@@ -265,8 +284,9 @@ impl DdlManager {
Repartition {
from_partition_exprs,
into_partition_exprs,
wait,
}: Repartition,
wait: bool,
timeout: Duration,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
@@ -278,8 +298,7 @@ impl DdlManager {
table_id,
from_partition_exprs,
into_partition_exprs,
// TODO(weny): get timeout from the proto.
None,
Some(timeout),
)
.context(CreateRepartitionProcedureSnafu)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -298,6 +317,7 @@ impl DdlManager {
&self,
table_id: TableId,
alter_table_task: AlterTableTask,
ddl_options: DdlOptions,
) -> Result<(ProcedureId, Option<Output>)> {
// make alter_table_task mutable so we can call .take() on its field
let mut alter_table_task = alter_table_task;
@@ -311,7 +331,13 @@ impl DdlManager {
alter_table_task.alter_table.table_name,
);
return self
.submit_repartition_task(table_id, table_name, repartition)
.submit_repartition_task(
table_id,
table_name,
repartition,
ddl_options.wait,
ddl_options.timeout,
)
.await;
}
@@ -566,6 +592,10 @@ impl DdlManager {
.map(TracingContext::from_w3c)
.unwrap_or_else(TracingContext::from_current_span)
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
let ddl_options = DdlOptions {
wait: request.wait,
timeout: request.timeout,
};
async move {
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
@@ -574,7 +604,7 @@ impl DdlManager {
}
DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
AlterTable(alter_table_task) => {
handle_alter_table_task(self, alter_table_task).await
handle_alter_table_task(self, alter_table_task, ddl_options).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, truncate_table_task).await
@@ -660,6 +690,7 @@ async fn handle_truncate_table_task(
async fn handle_alter_table_task(
ddl_manager: &DdlManager,
alter_table_task: AlterTableTask,
ddl_options: DdlOptions,
) -> Result<SubmitDdlTaskResponse> {
let table_ref = alter_table_task.table_ref();
@@ -692,7 +723,7 @@ async fn handle_alter_table_task(
);
let (id, _) = ddl_manager
.submit_alter_table_task(table_id, alter_table_task)
.submit_alter_table_task(table_id, alter_table_task, ddl_options)
.await?;
info!("Table: {table_id} is altered via procedure_id {id:?}");

View File

@@ -17,6 +17,7 @@ pub mod trigger;
use std::collections::{HashMap, HashSet};
use std::result;
use std::time::Duration;
use api::helper::{from_pb_time_ranges, to_pb_time_ranges};
use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
@@ -293,9 +294,33 @@ impl TryFrom<Task> for DdlTask {
#[derive(Clone)]
pub struct SubmitDdlTaskRequest {
pub query_context: QueryContextRef,
pub wait: bool,
pub timeout: Duration,
pub task: DdlTask,
}
impl SubmitDdlTaskRequest {
/// The default constructor for [`SubmitDdlTaskRequest`].
pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self {
Self {
query_context,
wait: Self::default_wait(),
timeout: Self::default_timeout(),
task,
}
}
/// The default timeout for a DDL task.
pub fn default_timeout() -> Duration {
Duration::from_secs(60)
}
/// The default wait for a DDL task.
pub fn default_wait() -> bool {
true
}
}
impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
type Error = error::Error;
@@ -346,6 +371,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
Ok(Self {
header: None,
query_context: Some((*request.query_context).clone().into()),
timeout_secs: request.timeout.as_secs() as u32,
wait: request.wait,
task: Some(task),
})
}

View File

@@ -149,7 +149,7 @@ impl FrontendClient {
chnl_mgr: {
let cfg = ChannelConfig::new()
.connect_timeout(batch_opts.grpc_conn_timeout)
.timeout(batch_opts.query_timeout);
.timeout(Some(batch_opts.query_timeout));
let tls_config = load_client_tls_config(batch_opts.frontend_tls.clone())
.context(InvalidClientConfigSnafu)?;
@@ -550,7 +550,7 @@ mod tests {
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
let client = FrontendClient::from_meta_client(
meta_client,
None,

View File

@@ -30,6 +30,7 @@ tonic.workspace = true
datatypes.workspace = true
futures.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
session.workspace = true
tower.workspace = true
tracing = "0.1"
tracing-subscriber.workspace = true

View File

@@ -33,7 +33,7 @@ fn main() {
async fn run() {
let id = 2000u64;
let config = ChannelConfig::new()
.timeout(Duration::from_secs(3))
.timeout(Some(Duration::from_secs(3)))
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config, None);

View File

@@ -23,6 +23,7 @@ mod util;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{
MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
@@ -63,6 +64,7 @@ use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::client::ask_leader::{LeaderProviderFactoryImpl, LeaderProviderFactoryRef};
use crate::error::{
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, GetFlowStatSnafu, NotStartedSnafu,
Result,
@@ -73,6 +75,7 @@ pub type Id = u64;
const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
const DEFAULT_CLUSTER_CLIENT_MAX_RETRY: usize = 3;
const DEFAULT_DDL_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
@@ -85,6 +88,8 @@ pub struct MetaClientBuilder {
region_follower: Option<RegionFollowerClientRef>,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
/// The default ddl timeout for each request.
ddl_timeout: Option<Duration>,
heartbeat_channel_manager: Option<ChannelManager>,
}
@@ -165,6 +170,13 @@ impl MetaClientBuilder {
}
}
pub fn ddl_timeout(self, timeout: Duration) -> Self {
Self {
ddl_timeout: Some(timeout),
..self
}
}
pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
Self {
heartbeat_channel_manager: Some(channel_manager),
@@ -180,62 +192,60 @@ impl MetaClientBuilder {
}
pub fn build(self) -> MetaClient {
let mut client = if let Some(mgr) = self.channel_manager {
MetaClient::with_channel_manager(self.id, mgr)
} else {
MetaClient::new(self.id)
};
let mgr = self.channel_manager.unwrap_or_default();
let heartbeat_channel_manager = self
.heartbeat_channel_manager
.clone()
.unwrap_or_else(|| mgr.clone());
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
let heartbeat = self.enable_heartbeat.then(|| {
if self.heartbeat_channel_manager.is_some() {
info!("Enable heartbeat channel using the heartbeat channel manager.");
}
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr,
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_procedure {
HeartbeatClient::new(self.id, self.role, heartbeat_channel_manager.clone())
});
let store = self
.enable_store
.then(|| StoreClient::new(self.id, self.role, mgr.clone()));
let procedure = self.enable_procedure.then(|| {
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
client.procedure = Some(ProcedureClient::new(
ProcedureClient::new(
self.id,
self.role,
mgr,
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}
self.ddl_timeout.unwrap_or(DEFAULT_DDL_TIMEOUT),
)
});
let cluster = self
.enable_access_cluster_info
.then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
let region_follower = self.region_follower.clone();
if self.enable_access_cluster_info {
client.cluster = Some(ClusterClient::new(
MetaClient {
id: self.id,
channel_manager: mgr.clone(),
leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
self.id,
self.role,
mgr,
DEFAULT_CLUSTER_CLIENT_MAX_RETRY,
))
DEFAULT_ASK_LEADER_MAX_RETRY,
heartbeat_channel_manager,
)),
heartbeat,
store,
procedure,
cluster,
region_follower,
}
if let Some(region_follower) = self.region_follower {
client.region_follower = Some(region_follower);
}
client
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct MetaClient {
id: Id,
channel_manager: ChannelManager,
leader_provider_factory: LeaderProviderFactoryRef,
heartbeat: Option<HeartbeatClient>,
store: Option<StoreClient>,
procedure: Option<ProcedureClient>,
@@ -243,6 +253,26 @@ pub struct MetaClient {
region_follower: Option<RegionFollowerClientRef>,
}
impl MetaClient {
pub fn new(id: Id, role: Role) -> Self {
Self {
id,
channel_manager: ChannelManager::default(),
leader_provider_factory: Arc::new(LeaderProviderFactoryImpl::new(
id,
role,
DEFAULT_ASK_LEADER_MAX_RETRY,
ChannelManager::default(),
)),
heartbeat: None,
store: None,
procedure: None,
cluster: None,
region_follower: None,
}
}
}
pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
/// A trait for clients that can manage region followers.
@@ -498,21 +528,6 @@ fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
}
impl MetaClient {
pub fn new(id: Id) -> Self {
Self {
id,
..Default::default()
}
}
pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self {
Self {
id,
channel_manager,
..Default::default()
}
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
@@ -520,29 +535,10 @@ impl MetaClient {
{
info!("MetaClient channel config: {:?}", self.channel_config());
if let Some(client) = &mut self.region_follower {
let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
client.start(&urls).await?;
info!("Region follower client started");
}
if let Some(client) = &mut self.heartbeat {
client.start(urls.clone()).await?;
info!("Heartbeat client started");
}
if let Some(client) = &mut self.store {
client.start(urls.clone()).await?;
info!("Store client started");
}
if let Some(client) = &mut self.procedure {
client.start(urls.clone()).await?;
info!("DDL client started");
}
if let Some(client) = &mut self.cluster {
client.start(urls).await?;
info!("Cluster client started");
}
let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
let leader_provider = self.leader_provider_factory.create(&urls);
Ok(())
self.start_with(leader_provider, urls).await
}
/// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.

View File

@@ -34,7 +34,7 @@ use crate::error::Result;
pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
/// Provide [MetaClient] a Metasrv leader's address.
/// Provide [`MetaClient`] a Metasrv leader's address.
#[async_trait]
pub trait LeaderProvider: Debug + Send + Sync {
/// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated,
@@ -45,6 +45,13 @@ pub trait LeaderProvider: Debug + Send + Sync {
async fn ask_leader(&self) -> Result<String>;
}
pub type LeaderProviderFactoryRef = Arc<dyn LeaderProviderFactory>;
/// A factory for creating [`LeaderProvider`] instances.
pub trait LeaderProviderFactory: Send + Sync + Debug {
fn create(&self, peers: &[&str]) -> LeaderProviderRef;
}
#[derive(Debug)]
struct LeadershipGroup {
leader: Option<String>,
@@ -206,3 +213,35 @@ impl LeaderProvider for AskLeader {
self.ask_leader().await
}
}
/// A factory for creating [`LeaderProvider`] instances.
#[derive(Clone, Debug)]
pub struct LeaderProviderFactoryImpl {
id: Id,
role: Role,
max_retry: usize,
channel_manager: ChannelManager,
}
impl LeaderProviderFactoryImpl {
pub fn new(id: Id, role: Role, max_retry: usize, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
max_retry,
channel_manager,
}
}
}
impl LeaderProviderFactory for LeaderProviderFactoryImpl {
fn create(&self, peers: &[&str]) -> LeaderProviderRef {
Arc::new(AskLeader::new(
self.id,
self.role,
peers.iter().map(|p| p.to_string()).collect::<Vec<_>>(),
self.channel_manager.clone(),
self.max_retry,
))
}
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader, Role};
use api::v1::meta::{MetasrvNodeInfo, MetasrvPeersRequest, ResponseHeader};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::ChannelManager;
use common_meta::error::{
@@ -37,8 +37,7 @@ use tonic::Status;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::client::ask_leader::AskLeader;
use crate::client::{Id, LeaderProviderRef, util};
use crate::client::{LeaderProviderRef, util};
use crate::error::{
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
@@ -50,10 +49,8 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
pub fn new(channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
leader_provider: None,
max_retry,
@@ -62,15 +59,6 @@ impl Client {
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
@@ -147,8 +135,6 @@ impl KvBackend for Client {
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
@@ -166,26 +152,6 @@ impl Inner {
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
);
self.start_with(Arc::new(ask_leader))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
let channel = self.channel_manager.get(addr).context(CreateChannelSnafu)?;

View File

@@ -30,7 +30,6 @@ use tonic::Streaming;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use crate::client::ask_leader::AskLeader;
use crate::client::{Id, LeaderProviderRef};
use crate::error;
use crate::error::{InvalidResponseHeaderSnafu, Result};
@@ -149,25 +148,11 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(
id,
role,
channel_manager,
max_retry,
)));
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
@@ -194,17 +179,15 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
}
impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
channel_manager,
leader_provider: None,
max_retry,
}
}
@@ -219,26 +202,6 @@ impl Inner {
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
);
self.start_with(Arc::new(ask_leader))
}
async fn ask_leader(&self) -> Result<String> {
let Some(leader_provider) = self.leader_provider.as_ref() else {
return error::IllegalGrpcClientStateSnafu {
@@ -332,15 +295,20 @@ impl Inner {
#[cfg(test)]
mod test {
use super::*;
use crate::client::AskLeader;
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new(0, Role::Datanode, ChannelManager::default(), 3);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
.unwrap();
let res = client.start(&["127.0.0.1:1002"]).await;
let client = Client::new(0, Role::Datanode, ChannelManager::default());
let leader_provider = Arc::new(AskLeader::new(
0,
Role::Datanode,
vec!["127.0.0.1:1000".to_string(), "127.0.0.1:1001".to_string()],
ChannelManager::default(),
3,
));
client.start_with(leader_provider.clone()).await.unwrap();
let res = client.start_with(leader_provider).await;
assert!(res.is_err());
assert!(matches!(
res.err(),

View File

@@ -27,11 +27,10 @@ use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use snafu::{ResultExt, ensure};
use tokio::sync::RwLock;
use tonic::Status;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic::{Request, Status};
use crate::client::ask_leader::AskLeader;
use crate::client::{Id, LeaderProviderRef, util};
use crate::error;
use crate::error::Result;
@@ -42,27 +41,25 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
pub fn new(
id: Id,
role: Role,
channel_manager: ChannelManager,
max_retry: usize,
timeout: Duration,
) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
leader_provider: None,
max_retry,
timeout,
}));
Self { inner }
}
pub async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
@@ -117,6 +114,8 @@ struct Inner {
channel_manager: ChannelManager,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
/// Request timeout.
timeout: Duration,
}
impl Inner {
@@ -131,26 +130,6 @@ impl Inner {
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
);
self.start_with(Arc::new(ask_leader))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
let channel = self
.channel_manager
@@ -209,7 +188,7 @@ impl Inner {
times += 1;
continue;
} else {
error!("An error occurred in gRPC, status: {status}");
error!("An error occurred in gRPC, status: {status:?}");
return Err(error::Error::from(status));
}
}
@@ -250,7 +229,8 @@ impl Inner {
self.with_retry(
"migrate region",
move |mut client| {
let req = req.clone();
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
async move { client.migrate(req).await.map(|res| res.into_inner()) }
},
@@ -270,7 +250,8 @@ impl Inner {
self.with_retry(
"reconcile",
move |mut client| {
let req = req.clone();
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
async move { client.reconcile(req).await.map(|res| res.into_inner()) }
},
@@ -294,7 +275,8 @@ impl Inner {
self.with_retry(
"query procedure state",
move |mut client| {
let req = req.clone();
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
async move { client.query(req).await.map(|res| res.into_inner()) }
},
@@ -309,11 +291,13 @@ impl Inner {
self.role,
TracingContext::from_current_span().to_w3c(),
);
let timeout = Duration::from_secs(req.timeout_secs.into());
self.with_retry(
"submit ddl task",
move |mut client| {
let req = req.clone();
let mut req = Request::new(req.clone());
req.set_timeout(timeout);
async move { client.ddl(req).await.map(|res| res.into_inner()) }
},
|resp: &DdlTaskResponse| &resp.header,
@@ -332,7 +316,8 @@ impl Inner {
self.with_retry(
"list procedure",
move |mut client| {
let req = req.clone();
let mut req = Request::new(req.clone());
req.set_timeout(self.timeout);
async move { client.details(req).await.map(|res| res.into_inner()) }
},
|resp: &ProcedureDetailResponse| &resp.header,
@@ -340,3 +325,175 @@ impl Inner {
.await
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use api::v1::meta::heartbeat_server::{Heartbeat, HeartbeatServer};
use api::v1::meta::procedure_service_server::{ProcedureService, ProcedureServiceServer};
use api::v1::meta::{
AskLeaderRequest, AskLeaderResponse, DdlTaskRequest, DdlTaskResponse, HeartbeatRequest,
HeartbeatResponse, MigrateRegionRequest, MigrateRegionResponse, Peer,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse,
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest};
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::info;
use session::context::QueryContext;
use tokio::net::TcpListener;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::codec::CompressionEncoding;
use tonic::{Request, Response, Status};
use crate::client::MetaClientBuilder;
#[derive(Clone)]
struct MockHeartbeat {
leader_addr: String,
}
#[async_trait]
impl Heartbeat for MockHeartbeat {
type HeartbeatStream = ReceiverStream<Result<HeartbeatResponse, Status>>;
async fn heartbeat(
&self,
_request: Request<tonic::Streaming<HeartbeatRequest>>,
) -> Result<Response<Self::HeartbeatStream>, Status> {
Err(Status::unimplemented(
"heartbeat stream is not used in this test",
))
}
async fn ask_leader(
&self,
_request: Request<AskLeaderRequest>,
) -> Result<Response<AskLeaderResponse>, Status> {
Ok(Response::new(AskLeaderResponse {
header: Some(ResponseHeader {
protocol_version: 0,
error: None,
}),
leader: Some(Peer {
id: 1,
addr: self.leader_addr.clone(),
}),
}))
}
}
#[derive(Clone)]
struct MockProcedure {
delay: Duration,
}
#[async_trait]
impl ProcedureService for MockProcedure {
async fn query(
&self,
_request: Request<QueryProcedureRequest>,
) -> Result<Response<ProcedureStateResponse>, Status> {
Err(Status::unimplemented("query is not used in this test"))
}
async fn ddl(
&self,
_request: Request<DdlTaskRequest>,
) -> Result<Response<DdlTaskResponse>, Status> {
tokio::time::sleep(self.delay).await;
Ok(Response::new(DdlTaskResponse {
header: Some(ResponseHeader {
protocol_version: 0,
error: None,
}),
..Default::default()
}))
}
async fn reconcile(
&self,
_request: Request<ReconcileRequest>,
) -> Result<Response<ReconcileResponse>, Status> {
Err(Status::unimplemented("reconcile is not used in this test"))
}
async fn migrate(
&self,
_request: Request<MigrateRegionRequest>,
) -> Result<Response<MigrateRegionResponse>, Status> {
Err(Status::unimplemented("migrate is not used in this test"))
}
async fn details(
&self,
_request: Request<ProcedureDetailRequest>,
) -> Result<Response<ProcedureDetailResponse>, Status> {
Err(Status::unimplemented("details is not used in this test"))
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_meta_client_ddl_request_timeout() {
common_telemetry::init_default_ut_logging();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let addr_str = addr.to_string();
let heartbeat = MockHeartbeat {
leader_addr: addr_str.clone(),
};
let procedure = MockProcedure {
delay: Duration::from_secs(4),
};
let server = tonic::transport::Server::builder()
.add_service(
HeartbeatServer::new(heartbeat).accept_compressed(CompressionEncoding::Zstd),
)
.add_service(
ProcedureServiceServer::new(procedure).accept_compressed(CompressionEncoding::Zstd),
)
.serve_with_incoming(TcpListenerStream::new(listener));
let server_handle = tokio::spawn(server);
let mut client = MetaClientBuilder::new(0, Role::Frontend)
.enable_heartbeat()
.enable_procedure()
.build();
client.start(&[addr_str.as_str()]).await.unwrap();
let mut request = SubmitDdlTaskRequest::new(
QueryContext::arc(),
DdlTask::new_comment_on(CommentOnTask {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
object_type: CommentObjectType::Table,
object_name: "test_table".to_string(),
column_name: None,
object_id: None,
comment: Some("timeout".to_string()),
}),
);
request.timeout = Duration::from_secs(1);
let now = Instant::now();
let err = client.submit_ddl_task(request).await.unwrap_err();
let elapsed = now.elapsed();
// The request should be cancelled within 1 second.
assert!(elapsed < Duration::from_secs(2));
info!("err: {err:?}, code: {}", err.status_code());
assert_eq!(err.status_code(), StatusCode::Cancelled);
let err_msg = err.to_string();
assert!(
err_msg.contains("Timeout expired"),
"unexpected error: {err_msg}"
);
server_handle.abort();
}
}

View File

@@ -95,18 +95,22 @@ pub async fn create_meta_client(
};
let base_config = ChannelConfig::new()
.timeout(meta_client_options.timeout)
.timeout(Some(meta_client_options.timeout))
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let heartbeat_config = base_config
.clone()
.timeout(HEARTBEAT_TIMEOUT)
.timeout(Some(HEARTBEAT_TIMEOUT))
.http2_keep_alive_interval(HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS)
.http2_keep_alive_timeout(HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS);
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config, None));
// Unset the timeout in the DDL channel manager,
// delegating timeout control to each individual request rather than the channel manager itself.
let ddl_config = base_config.clone().timeout(None);
builder = builder
.ddl_timeout(meta_client_options.ddl_timeout)
.ddl_channel_manager(ChannelManager::with_config(ddl_config, None));
if let Some(plugins) = plugins {
let region_follower = plugins.get::<RegionFollowerClientRef>();
if let Some(region_follower) = region_follower {

View File

@@ -294,7 +294,7 @@ impl MetasrvBuilder {
let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
let node_manager = node_manager.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(options.datanode.client.timeout)
.timeout(Some(options.datanode.client.timeout))
.connect_timeout(options.datanode.client.connect_timeout)
.tcp_nodelay(options.datanode.client.tcp_nodelay);
Arc::new(NodeClients::new(datanode_client_channel_config))

View File

@@ -131,7 +131,7 @@ pub async fn mock(
});
let config = ChannelConfig::new()
.timeout(Duration::from_secs(10))
.timeout(Some(Duration::from_secs(10)))
.connect_timeout(Duration::from_secs(10))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config, None);

View File

@@ -75,7 +75,8 @@ impl procedure_service_server::ProcedureService for Metasrv {
header,
query_context,
task,
..
wait,
timeout_secs,
} = request.into_inner();
let header = header.context(error::MissingRequestHeaderSnafu)?;
@@ -97,6 +98,8 @@ impl procedure_service_server::ProcedureService for Metasrv {
},
SubmitDdlTaskRequest {
query_context: Arc::new(query_context),
wait,
timeout: Duration::from_secs(timeout_secs.into()),
task,
},
)

View File

@@ -76,6 +76,7 @@ tonic.workspace = true
tracing.workspace = true
[dev-dependencies]
catalog = { workspace = true, features = ["testing"] }
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
path-slash = "0.2"

View File

@@ -695,18 +695,24 @@ pub struct RepartitionRequest {
pub table_name: String,
pub from_exprs: Vec<Expr>,
pub into_exprs: Vec<Expr>,
pub options: OptionMap,
}
pub(crate) fn to_repartition_request(
alter_table: AlterTable,
query_ctx: &QueryContextRef,
) -> Result<RepartitionRequest> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let AlterTable {
table_name,
alter_operation,
options,
} = alter_table;
let AlterTableOperation::Repartition { operation } = alter_table.alter_operation else {
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let AlterTableOperation::Repartition { operation } = alter_operation else {
return InvalidSqlSnafu {
err_msg: "expected REPARTITION operation",
}
@@ -719,6 +725,7 @@ pub(crate) fn to_repartition_request(
table_name,
from_exprs: operation.from_exprs,
into_exprs: operation.into_exprs,
options,
})
}

View File

@@ -40,10 +40,8 @@ impl StatementExecutor {
pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?;
let request = SubmitDdlTaskRequest {
task: DdlTask::new_comment_on(comment_on_task),
query_context: query_ctx,
};
let request =
SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -59,10 +57,8 @@ impl StatementExecutor {
) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_expr(expr)?;
let request = SubmitDdlTaskRequest {
task: DdlTask::new_comment_on(comment_on_task),
query_context: query_ctx,
};
let request =
SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)

View File

@@ -14,6 +14,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_table_expr::Kind;
@@ -46,6 +47,7 @@ use common_meta::rpc::ddl::{
SubmitDdlTaskResponse,
};
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_sql::convert::sql_value_to_value;
use common_telemetry::{debug, info, tracing, warn};
use common_time::{Timestamp, Timezone};
@@ -54,6 +56,8 @@ use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema};
use datatypes::value::Value;
use datatypes::vectors::{StringVector, VectorRef};
use humantime::parse_duration;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
use query::parser::QueryStatement;
@@ -64,6 +68,7 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::OptionMap;
#[cfg(feature = "enterprise")]
use sql::statements::alter::trigger::AlterTrigger;
use sql::statements::alter::{AlterDatabase, AlterTable, AlterTableOperation};
@@ -79,7 +84,9 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::TableRef;
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, COMMENT_KEY, TableOptions};
use table::requests::{
AlterKind, AlterTableRequest, COMMENT_KEY, DDL_TIMEOUT, DDL_WAIT, TableOptions,
};
use table::table_name::TableName;
use table::table_reference::TableReference;
@@ -98,6 +105,51 @@ use crate::expr_helper::{self, RepartitionRequest};
use crate::statement::StatementExecutor;
use crate::statement::show::create_partitions_stmt;
#[derive(Debug, Clone, Copy)]
struct DdlSubmitOptions {
wait: bool,
timeout: Duration,
}
fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"Procedure ID",
vector.data_type(),
false,
)]));
let batch =
RecordBatch::new(schema.clone(), vec![vector]).context(error::BuildRecordBatchSnafu)?;
let batches =
RecordBatches::try_new(schema, vec![batch]).context(error::BuildRecordBatchSnafu)?;
Ok(Output::new_with_record_batches(batches))
}
fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
let wait = match options.get(DDL_WAIT) {
Some(value) => value.parse::<bool>().map_err(|_| {
InvalidSqlSnafu {
err_msg: format!("invalid DDL option '{DDL_WAIT}': '{value}'"),
}
.build()
})?,
None => SubmitDdlTaskRequest::default_wait(),
};
let timeout = match options.get(DDL_TIMEOUT) {
Some(value) => parse_duration(value).map_err(|err| {
InvalidSqlSnafu {
err_msg: format!("invalid DDL option '{DDL_TIMEOUT}': '{value}': {err}"),
}
.build()
})?,
None => SubmitDdlTaskRequest::default_timeout(),
};
Ok(DdlSubmitOptions { wait, timeout })
}
impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
@@ -505,10 +557,7 @@ impl StatementExecutor {
})
.context(error::InvalidExprSnafu)?;
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_trigger(task),
};
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -558,10 +607,7 @@ impl StatementExecutor {
create_flow: Some(expr),
})
.context(error::InvalidExprSnafu)?;
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_create_flow(task),
};
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -856,10 +902,8 @@ impl StatementExecutor {
table_type: TableType::View,
};
let request = SubmitDdlTaskRequest {
query_context: ctx,
task: DdlTask::new_create_view(expr, view_info.clone()),
};
let request =
SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone()));
let resp = self
.procedure_executor
@@ -942,10 +986,7 @@ impl StatementExecutor {
expr: DropFlowTask,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_drop_flow(expr),
};
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -977,10 +1018,7 @@ impl StatementExecutor {
expr: DropTriggerTask,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_drop_trigger(expr),
};
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1045,10 +1083,7 @@ impl StatementExecutor {
expr: DropViewTask,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_drop_view(expr),
};
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1418,6 +1453,7 @@ impl StatementExecutor {
new_partition_exprs_len
);
let ddl_options = parse_ddl_options(&request.options)?;
let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
let mut json_exprs = Vec::with_capacity(exprs.len());
for expr in exprs {
@@ -1427,20 +1463,32 @@ impl StatementExecutor {
};
let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
let req = SubmitDdlTaskRequest {
query_context: query_context.clone(),
task: DdlTask::new_alter_table(AlterTableExpr {
let mut req = SubmitDdlTaskRequest::new(
query_context.clone(),
DdlTask::new_alter_table(AlterTableExpr {
catalog_name: request.catalog_name.clone(),
schema_name: request.schema_name.clone(),
table_name: request.table_name.clone(),
kind: Some(Kind::Repartition(Repartition {
from_partition_exprs: from_partition_exprs_json,
into_partition_exprs: into_partition_exprs_json,
// TODO(weny): allow passing 'wait' from SQL options or QueryContext
wait: true,
})),
}),
};
);
req.wait = ddl_options.wait;
req.timeout = ddl_options.timeout;
let response = self
.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
if !ddl_options.wait {
return build_procedure_id_output(response.key);
}
// Only invalidate cache if wait is true.
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(
@@ -1449,10 +1497,6 @@ impl StatementExecutor {
request.table_name,
)),
];
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
@@ -1524,10 +1568,7 @@ impl StatementExecutor {
let (req, invalidate_keys) = if physical_table_id == table_id {
// This is physical table
let req = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_table(expr),
};
let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr));
let invalidate_keys = vec![
CacheIdent::TableId(table_id),
@@ -1537,10 +1578,10 @@ impl StatementExecutor {
(req, invalidate_keys)
} else {
// This is logical table
let req = SubmitDdlTaskRequest {
let req = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_alter_logical_tables(vec![expr]),
};
DdlTask::new_alter_logical_tables(vec![expr]),
);
let mut invalidate_keys = vec![
CacheIdent::TableId(physical_table_id),
@@ -1658,10 +1699,10 @@ impl StatementExecutor {
.map(|expr| expr.as_pb_partition().context(PartitionExprToPbSnafu))
.collect::<Result<Vec<_>>>()?;
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
DdlTask::new_create_table(create_table, partitions, table_info),
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1674,10 +1715,10 @@ impl StatementExecutor {
tables_data: Vec<(CreateTableExpr, RawTableInfo)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_create_logical_tables(tables_data),
};
DdlTask::new_create_logical_tables(tables_data),
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1690,10 +1731,10 @@ impl StatementExecutor {
tables_data: Vec<AlterTableExpr>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_alter_logical_tables(tables_data),
};
DdlTask::new_alter_logical_tables(tables_data),
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1708,16 +1749,16 @@ impl StatementExecutor {
drop_if_exists: bool,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_drop_table(
DdlTask::new_drop_table(
table_name.catalog_name.clone(),
table_name.schema_name.clone(),
table_name.table_name.clone(),
table_id,
drop_if_exists,
),
};
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1732,10 +1773,10 @@ impl StatementExecutor {
drop_if_exists: bool,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_drop_database(catalog, schema, drop_if_exists),
};
DdlTask::new_drop_database(catalog, schema, drop_if_exists),
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1748,10 +1789,8 @@ impl StatementExecutor {
alter_expr: AlterDatabaseExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
query_context,
task: DdlTask::new_alter_database(alter_expr),
};
let request =
SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr));
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1766,16 +1805,16 @@ impl StatementExecutor {
time_ranges: Vec<(Timestamp, Timestamp)>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_truncate_table(
DdlTask::new_truncate_table(
table_name.catalog_name.clone(),
table_name.schema_name.clone(),
table_name.table_name.clone(),
table_id,
time_ranges,
),
};
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -1838,10 +1877,10 @@ impl StatementExecutor {
options: HashMap<String, String>,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest {
let request = SubmitDdlTaskRequest::new(
query_context,
task: DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
};
DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
);
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
@@ -2204,6 +2243,8 @@ fn convert_value(
#[cfg(test)]
mod test {
use std::time::Duration;
use session::context::{QueryContext, QueryContextBuilder};
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
@@ -2213,6 +2254,17 @@ mod test {
use super::*;
use crate::expr_helper;
#[test]
fn test_parse_ddl_options() {
let options = OptionMap::from([
("timeout".to_string(), "5m".to_string()),
("wait".to_string(), "false".to_string()),
]);
let ddl_options = parse_ddl_options(&options).unwrap();
assert!(!ddl_options.wait);
assert_eq!(Duration::from_secs(300), ddl_options.timeout);
}
#[test]
fn test_name_is_match() {
assert!(!NAME_PATTERN_REG.is_match("/adaf"));

View File

@@ -30,7 +30,8 @@ use crate::error::{self, InvalidColumnOptionSnafu, Result, SetFulltextOptionSnaf
use crate::parser::ParserContext;
use crate::parsers::create_parser::INVERTED;
use crate::parsers::utils::{
validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
parse_with_options, validate_column_fulltext_create_option,
validate_column_skipping_index_create_option,
};
use crate::statements::OptionMap;
use crate::statements::alter::{
@@ -181,7 +182,10 @@ impl ParserContext<'_> {
}
unexpected => self.unsupported(unexpected.to_string())?,
};
Ok(AlterTable::new(table_name, alter_operation))
let options = parse_with_options(&mut self.parser)?;
// TODO(weny): Respect the DDL options (e.g., WAIT and TIMEOUT) after the ALTER TABLE statement.
Ok(AlterTable::new(table_name, alter_operation, options))
}
fn parse_alter_table_unset(&mut self) -> Result<AlterTableOperation> {
@@ -1043,6 +1047,50 @@ ALTER TABLE metrics MERGE PARTITION (
}
}
#[test]
fn test_parse_alter_table_merge_partition_with_options() {
let sql = r#"
ALTER TABLE alter_repartition_table MERGE PARTITION (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
) WITH (
TIMEOUT = '5m',
WAIT = false
);"#;
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, result.len());
let statement = result.remove(0);
assert_matches!(statement, Statement::AlterTable { .. });
if let Statement::AlterTable(alter_table) = statement {
assert_matches!(
alter_table.alter_operation(),
AlterTableOperation::Repartition { .. }
);
if let AlterTableOperation::Repartition { operation } = alter_table.alter_operation() {
assert_eq!(operation.from_exprs.len(), 2);
assert_eq!(
operation.from_exprs[0].to_string(),
"device_id < 100 AND area < 'South'"
);
assert_eq!(
operation.from_exprs[1].to_string(),
"device_id < 100 AND area >= 'South'"
);
assert_eq!(operation.into_exprs.len(), 1);
}
// Verify WITH options are parsed
let options = alter_table.options().to_str_map();
assert_eq!(options.get("timeout").unwrap(), &"5m");
assert_eq!(options.get("wait").unwrap(), &"false");
assert_eq!(options.len(), 2);
}
}
#[test]
fn test_parse_alter_table_repartition_multiple() {
let sql = r#"

View File

@@ -31,19 +31,19 @@ use sqlparser::keywords::ALL_KEYWORDS;
use sqlparser::parser::IsOptional::Mandatory;
use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, TokenWithSpan, Word};
use table::requests::{validate_database_option, validate_table_option};
use table::requests::validate_database_option;
use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
use crate::error::{
self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
InvalidSqlSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::parser::{FLOW, ParserContext};
use crate::parsers::tql_parser;
use crate::parsers::utils::{
self, validate_column_fulltext_create_option, validate_column_skipping_index_create_option,
validate_column_vector_index_create_option,
self, parse_with_options, validate_column_fulltext_create_option,
validate_column_skipping_index_create_option, validate_column_vector_index_create_option,
};
use crate::statements::create::{
Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable,
@@ -447,17 +447,7 @@ impl<'a> ParserContext<'a> {
}
fn parse_create_table_options(&mut self) -> Result<OptionMap> {
let options = self
.parser
.parse_options(Keyword::WITH)
.context(SyntaxSnafu)?
.into_iter()
.map(parse_option_string)
.collect::<Result<HashMap<String, OptionValue>>>()?;
for key in options.keys() {
ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
}
Ok(OptionMap::new(options))
parse_with_options(&mut self.parser)
}
/// "PARTITION ON COLUMNS (...)" clause

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use chrono::Utc;
@@ -39,13 +40,18 @@ use datatypes::schema::{
};
use snafu::{ResultExt, ensure};
use sqlparser::dialect::Dialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use table::requests::validate_table_option;
use crate::error::{
ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, ParseSqlValueSnafu, Result,
SimplificationSnafu,
ConvertToLogicalExpressionSnafu, InvalidSqlSnafu, InvalidTableOptionSnafu, ParseSqlValueSnafu,
Result, SimplificationSnafu, SyntaxSnafu,
};
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::OptionMap;
use crate::statements::statement::Statement;
use crate::util::{OptionValue, parse_option_string};
/// Check if the given SQL query is a TQL statement.
pub fn is_tql(dialect: &dyn Dialect, sql: &str) -> Result<bool> {
@@ -272,6 +278,19 @@ pub fn convert_month_day_nano_to_duration(
Ok(std::time::Duration::new(adjusted_seconds, nanos_remainder))
}
pub fn parse_with_options(parser: &mut Parser) -> Result<OptionMap> {
let options = parser
.parse_options(Keyword::WITH)
.context(SyntaxSnafu)?
.into_iter()
.map(parse_option_string)
.collect::<Result<HashMap<String, OptionValue>>>()?;
for key in options.keys() {
ensure!(validate_table_option(key), InvalidTableOptionSnafu { key });
}
Ok(OptionMap::new(options))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -25,17 +25,26 @@ use serde::Serialize;
use sqlparser::ast::{ColumnDef, DataType, Expr, Ident, ObjectName, TableConstraint};
use sqlparser_derive::{Visit, VisitMut};
use crate::statements::OptionMap;
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct AlterTable {
pub table_name: ObjectName,
pub alter_operation: AlterTableOperation,
/// Table options in `WITH`. All keys are lowercase.
pub options: OptionMap,
}
impl AlterTable {
pub(crate) fn new(table_name: ObjectName, alter_operation: AlterTableOperation) -> Self {
pub(crate) fn new(
table_name: ObjectName,
alter_operation: AlterTableOperation,
options: OptionMap,
) -> Self {
Self {
table_name,
alter_operation,
options,
}
}
@@ -47,6 +56,10 @@ impl AlterTable {
&self.alter_operation
}
pub fn options(&self) -> &OptionMap {
&self.options
}
pub fn alter_operation_mut(&mut self) -> &mut AlterTableOperation {
&mut self.alter_operation
}

View File

@@ -90,6 +90,9 @@ impl OptionMap {
self.options.len() + self.secrets.len()
}
/// Convert the option map to a string map.
///
/// Notes: Not all values can be converted to a string, refer to [OptionValue::expr_as_string] for more details.
pub fn to_str_map(&self) -> HashMap<&str, &str> {
let mut map = HashMap::with_capacity(self.len());
map.extend(
@@ -105,6 +108,9 @@ impl OptionMap {
map
}
/// Convert the option map to a string map.
///
/// Notes: Not all values can be converted to a string, refer to [OptionValue::expr_as_string] for more details.
pub fn into_map(self) -> HashMap<String, String> {
let mut map = HashMap::with_capacity(self.len());
map.extend(

View File

@@ -87,6 +87,7 @@ impl OptionValue {
| Value::HexStringLiteral(s) => Some(s),
Value::DollarQuotedString(s) => Some(&s.value),
Value::Number(s, _) => Some(s),
Value::Boolean(b) => Some(if *b { "true" } else { "false" }),
_ => None,
},
Expr::Identifier(ident) => Some(&ident.value),
@@ -94,6 +95,9 @@ impl OptionValue {
}
}
/// Convert the option value to a string.
///
/// Notes: Not all values can be converted to a string, refer to [Self::expr_as_string] for more details.
pub fn as_string(&self) -> Option<&str> {
Self::expr_as_string(&self.0)
}

View File

@@ -75,6 +75,11 @@ pub const VALID_TABLE_OPTION_KEYS: [&str; 12] = [
OTLP_METRIC_COMPAT_KEY,
];
pub const DDL_TIMEOUT: &str = "timeout";
pub const DDL_WAIT: &str = "wait";
pub const VALID_DDL_OPTION_KEYS: [&str; 2] = [DDL_TIMEOUT, DDL_WAIT];
// Valid option keys when creating a db.
static VALID_DB_OPT_KEYS: Lazy<HashSet<&str>> = Lazy::new(|| {
let mut set = HashSet::new();
@@ -115,7 +120,7 @@ pub fn validate_table_option(key: &str) -> bool {
return true;
}
VALID_TABLE_OPTION_KEYS.contains(&key)
VALID_TABLE_OPTION_KEYS.contains(&key) || VALID_DDL_OPTION_KEYS.contains(&key)
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -200,7 +200,7 @@ impl GreptimeDbClusterBuilder {
guards: Vec<TestGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let channel_config = ChannelConfig::new().timeout(Some(Duration::from_secs(20)));
let datanode_clients = Arc::new(NodeClients::new(channel_config));
let opt = MetasrvOptions {

View File

@@ -99,6 +99,41 @@ DROP TABLE alter_repartition_table;
Affected Rows: 0
CREATE TABLE alter_repartition_table_with_options(
device_id INT,
area STRING,
ty STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id, area) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
Affected Rows: 0
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-4[0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}) PROC_ID
ALTER TABLE alter_repartition_table_with_options REPARTITION (
device_id < 100
) INTO (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
) WITH (
TIMEOUT = '5m',
WAIT = false
);
+--------------------------------------+
| Procedure ID |
+--------------------------------------+
| PROC_ID |
+--------------------------------------+
DROP TABLE alter_repartition_table_with_options;
Affected Rows: 0
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,

View File

@@ -47,6 +47,31 @@ ALTER TABLE alter_repartition_table REPARTITION (
DROP TABLE alter_repartition_table;
CREATE TABLE alter_repartition_table_with_options(
device_id INT,
area STRING,
ty STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(device_id)
) PARTITION ON COLUMNS (device_id, area) (
device_id < 100,
device_id >= 100 AND device_id < 200,
device_id >= 200
);
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-4[0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}) PROC_ID
ALTER TABLE alter_repartition_table_with_options REPARTITION (
device_id < 100
) INTO (
device_id < 100 AND area < 'South',
device_id < 100 AND area >= 'South'
) WITH (
TIMEOUT = '5m',
WAIT = false
);
DROP TABLE alter_repartition_table_with_options;
-- Metric engine repartition test
CREATE TABLE metric_physical_table (
ts TIMESTAMP TIME INDEX,