diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 046a132ec3..df460fde17 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -25,6 +25,12 @@ use store_api::storage::RegionNumber; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Execute the operation timeout, source: {}", source))] + Timeout { + location: Location, + source: tokio::time::error::Elapsed, + }, + #[snafu(display("Failed to handle heartbeat response, source: {}", source))] HandleHeartbeatResponse { location: Location, @@ -633,7 +639,8 @@ impl ErrorExt for Error { | Error::FindRegionRoute { .. } | Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } - | Error::VectorToGrpcColumn { .. } => StatusCode::Internal, + | Error::VectorToGrpcColumn { .. } + | Error::Timeout { .. } => StatusCode::Internal, Error::IncompleteGrpcResult { .. } | Error::ContextValueNotFound { .. } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 43fd1fdaab..54e8ea5e2d 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -237,6 +237,7 @@ impl Instance { .enable_router() .enable_store() .enable_heartbeat() + .enable_ddl() .channel_manager(channel_manager) .build(); meta_client diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index c819a76c22..d190514eaa 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -16,6 +16,7 @@ pub(crate) mod inserter; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::Expr as DdlExpr; @@ -33,14 +34,14 @@ use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::prelude::BoxedError; +use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{ - CreateRequest as MetaCreateRequest, DeleteRequest as MetaDeleteRequest, - Partition as MetaPartition, RouteRequest, RouteResponse, + DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest, }; use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, info}; use datanode::instance::sql::table_idents_to_full_name; use datanode::sql::SqlHandler; use datatypes::prelude::ConcreteDataType; @@ -63,6 +64,7 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; use table::table::AlterContext; use table::TableRef; +use tokio::time::timeout; use crate::catalog::FrontendCatalogManager; use crate::error::{ @@ -98,17 +100,6 @@ impl DistInstance { } } - async fn find_table(&self, table_name: &TableName) -> Result> { - self.catalog_manager - .table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ) - .await - .context(CatalogSnafu) - } - pub async fn create_table( &self, create_table: &mut CreateTableExpr, @@ -121,56 +112,14 @@ impl DistInstance { &create_table.table_name, ); - if let Some(table) = self.find_table(&table_name).await? { - return if create_table.create_if_not_exists { - Ok(table) - } else { - TableAlreadyExistSnafu { - table: table_name.to_string(), - } - .fail() - }; - } - let mut table_info = create_table_info(create_table)?; - let response = self - .create_table_in_meta(create_table, partitions, &table_info) - .await; - let response = match response { - Ok(response) => response, - Err(e) => { - return if let Some(table) = self.find_table(&table_name).await? { - warn!("Table '{table_name}' is created concurrently by other Frontend nodes!"); - Ok(table) - } else { - Err(e) - } - } - }; + let resp = self + .create_table_procedure(create_table, partitions, table_info.clone()) + .await?; - let table_routes = response.table_routes; - ensure!( - table_routes.len() == 1, - error::CreateTableRouteSnafu { - table_name: create_table.table_name.to_string() - } - ); - let table_route = table_routes.first().unwrap(); - info!( - "Creating distributed table {table_name} with table routes: {}", - serde_json::to_string_pretty(table_route) - .unwrap_or_else(|_| format!("{table_route:#?}")) - ); - let region_routes = &table_route.region_routes; - ensure!( - !region_routes.is_empty(), - error::FindRegionRouteSnafu { - table_name: create_table.table_name.to_string() - } - ); - - let table_id = table_route.table.id as u32; + let table_id = resp.table_id; + info!("Successfully created distributed table '{table_name}' with table id {table_id}"); table_info.ident.table_id = table_id; let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?); @@ -199,26 +148,6 @@ impl DistInstance { } ); - for datanode in table_route.find_leaders() { - let client = self.datanode_clients.get_client(&datanode).await; - let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client); - - let regions = table_route.find_leader_regions(&datanode); - let mut create_expr_for_region = create_table.clone(); - create_expr_for_region.region_numbers = regions; - - debug!( - "Creating table {:?} on Datanode {:?} with regions {:?}", - create_table, datanode, create_expr_for_region.region_numbers, - ); - - let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE); - let _ = client - .create(create_expr_for_region) - .await - .context(RequestDatanodeSnafu)?; - } - // Since the table information created on meta does not go through KvBackend, so we // manually invalidate the cache here. // @@ -554,33 +483,27 @@ impl DistInstance { Ok(Output::AffectedRows(0)) } - async fn create_table_in_meta( + async fn create_table_procedure( &self, create_table: &CreateTableExpr, partitions: Option, - table_info: &RawTableInfo, - ) -> Result { - let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_META); - let mut catalog_name = create_table.catalog_name.clone(); - if catalog_name.is_empty() { - catalog_name = DEFAULT_CATALOG_NAME.to_string(); - } - let mut schema_name = create_table.schema_name.clone(); - if schema_name.is_empty() { - schema_name = DEFAULT_SCHEMA_NAME.to_string(); - } - let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone()); - + table_info: RawTableInfo, + ) -> Result { let partitions = parse_partitions(create_table, partitions)?; - let request = MetaCreateRequest { - table_name, - partitions, - table_info, + let partitions = partitions.into_iter().map(Into::into).collect(); + + let request = SubmitDdlTaskRequest { + task: DdlTask::new_create_table(create_table.clone(), partitions, table_info), }; - self.meta_client - .create_route(request) - .await - .context(RequestMetaSnafu) + + timeout( + // TODO(weny): makes timeout configurable. + Duration::from_secs(10), + self.meta_client.submit_ddl_task(request), + ) + .await + .context(error::TimeoutSnafu)? + .context(error::RequestMetaSnafu) } async fn handle_dist_insert( diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index cb7745d8c0..1cd3d290ca 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -20,8 +20,6 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed" /// frontend metrics /// Metrics for creating table in dist mode. pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; -pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta"; -pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode"; pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; /// The samples count of Prometheus remote write. diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 5b871a99b4..c04031589a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -21,6 +21,7 @@ mod store; use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest}; use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; use common_meta::rpc::store::{ @@ -185,11 +186,14 @@ impl MetaClient { client.start(urls.clone()).await?; info!("Store client started"); } - if let Some(client) = &mut self.lock { - client.start(urls).await?; + client.start(urls.clone()).await?; info!("Lock client started"); } + if let Some(client) = &mut self.ddl { + client.start(urls).await?; + info!("Ddl client started"); + } Ok(()) } @@ -348,6 +352,20 @@ impl MetaClient { Ok(()) } + pub async fn submit_ddl_task( + &self, + req: SubmitDdlTaskRequest, + ) -> Result { + let res = self + .ddl_client()? + .submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?) + .await? + .try_into() + .context(error::ConvertMetaResponseSnafu)?; + + Ok(res) + } + #[inline] pub fn heartbeat_client(&self) -> Result { self.heartbeat.clone().context(error::NotStartedSnafu { @@ -376,6 +394,13 @@ impl MetaClient { }) } + #[inline] + pub fn ddl_client(&self) -> Result { + self.ddl + .clone() + .context(error::NotStartedSnafu { name: "ddl_client" }) + } + #[inline] pub fn channel_config(&self) -> &ChannelConfig { self.channel_manager.config() diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index 510e0e6f2f..90191dd519 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -27,14 +27,10 @@ use crate::error; use crate::error::Result; #[derive(Clone, Debug)] -// TODO(weny): removes this in following PRs. -#[allow(unused)] pub struct Client { inner: Arc>, } -// TODO(weny): removes this in following PRs. -#[allow(dead_code)] impl Client { pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { let inner = Arc::new(RwLock::new(Inner { diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index f0da4120b2..741a534a4b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use api::v1::meta::cluster_server::ClusterServer; +use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; use api::v1::meta::router_server::RouterServer; @@ -147,6 +148,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(StoreServer::new(meta_srv.clone())) .add_service(ClusterServer::new(meta_srv.clone())) .add_service(LockServer::new(meta_srv.clone())) + .add_service(DdlTaskServer::new(meta_srv.clone())) .add_service(admin::make_admin_service(meta_srv)) } diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index ba84055feb..e165617ae3 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -36,8 +36,6 @@ pub struct DdlManager { server_addr: String, } -// TODO(weny): removes in following PRs. -#[allow(unused)] #[derive(Clone)] pub(crate) struct DdlContext { pub(crate) kv_store: KvStoreRef, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 86ab963c20..e338e16fb5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -53,6 +53,7 @@ pub struct MetaSrvBuilder { meta_peer_client: Option, lock: Option, metadata_service: Option, + datanode_clients: Option>, } impl MetaSrvBuilder { @@ -67,6 +68,7 @@ impl MetaSrvBuilder { options: None, lock: None, metadata_service: None, + datanode_clients: None, } } @@ -115,6 +117,11 @@ impl MetaSrvBuilder { self } + pub fn datanode_clients(mut self, clients: Arc) -> Self { + self.datanode_clients = Some(clients); + self + } + pub async fn build(self) -> Result { let started = Arc::new(AtomicBool::new(false)); @@ -128,6 +135,7 @@ impl MetaSrvBuilder { handler_group, lock, metadata_service, + datanode_clients, } = self; let options = options.unwrap_or_default(); @@ -162,7 +170,7 @@ impl MetaSrvBuilder { let ddl_manager = Arc::new(DdlManager::new( procedure_manager.clone(), kv_store.clone(), - Arc::new(DatanodeClients::default()), + datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())), mailbox.clone(), options.server_addr.clone(), )); diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index cac6598991..0cb93e47b4 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -18,3 +18,8 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; + +pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META: &str = + "meta.procedure.create_table.create_meta"; +pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE: &str = + "meta.procedure.create_table.create_table"; diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index d9b1406040..c6c38fbbdf 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -15,9 +15,11 @@ use std::sync::Arc; use std::time::Duration; +use api::v1::meta::ddl_task_server::DdlTaskServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; +use client::client_manager::DatanodeClients; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use tower::service_fn; @@ -38,23 +40,24 @@ pub struct MockInfo { pub async fn mock_with_memstore() -> MockInfo { let kv_store = Arc::new(MemStore::default()); - mock(Default::default(), kv_store, None).await + mock(Default::default(), kv_store, None, None).await } pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { let kv_store = EtcdStore::with_endpoints([addr]).await.unwrap(); - mock(Default::default(), kv_store, None).await + mock(Default::default(), kv_store, None, None).await } pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo { let kv_store = Arc::new(MemStore::default()); - mock(Default::default(), kv_store, Some(selector)).await + mock(Default::default(), kv_store, Some(selector), None).await } pub async fn mock( opts: MetaSrvOptions, kv_store: KvStoreRef, selector: Option, + datanode_clients: Option>, ) -> MockInfo { let server_addr = opts.server_addr.clone(); @@ -72,6 +75,11 @@ pub async fn mock( None => builder, }; + let builder = match datanode_clients { + Some(clients) => builder.datanode_clients(clients), + None => builder, + }; + let meta_srv = builder.build().await.unwrap(); meta_srv.try_start().await.unwrap(); @@ -82,6 +90,7 @@ pub async fn mock( .add_service(HeartbeatServer::new(service.clone())) .add_service(RouterServer::new(service.clone())) .add_service(StoreServer::new(service.clone())) + .add_service(DdlTaskServer::new(service.clone())) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index f3a98eb225..c992d03a3a 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -37,15 +37,11 @@ use crate::error::{self, Result}; use crate::service::router::create_table_global_value; use crate::table_routes::get_table_global_value; -// TODO(weny): removes in following PRs. -#[allow(unused)] pub struct CreateTableProcedure { context: DdlContext, creator: TableCreator, } -// TODO(weny): removes in following PRs. -#[allow(dead_code)] impl CreateTableProcedure { pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; @@ -105,6 +101,9 @@ impl CreateTableProcedure { /// registers the `TableRouteValue`,`TableGlobalValue` async fn register_metadata(&self) -> Result<()> { + let _timer = common_telemetry::timer!( + crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META + ); let table_name = self.table_name(); let table_id = self.creator.data.table_route.table.id as TableId; @@ -195,8 +194,10 @@ impl CreateTableProcedure { } async fn on_datanode_create_table(&mut self) -> Result { + let _timer = common_telemetry::timer!( + crate::metrics::METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE + ); let table_route = &self.creator.data.table_route; - let table_name = self.table_name(); let clients = self.context.datanode_clients.clone(); let leaders = table_route.find_leaders(); @@ -209,6 +210,9 @@ impl CreateTableProcedure { let regions = table_route.find_leader_regions(&datanode); let mut create_expr_for_region = self.creator.data.task.create_table.clone(); create_expr_for_region.region_numbers = regions; + create_expr_for_region.table_id = Some(api::v1::TableId { + id: table_route.table.id as u32, + }); joins.push(common_runtime::spawn_bg(async move { if let Err(err) = client.create(create_expr_for_region).await { @@ -280,7 +284,7 @@ impl TableCreator { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] enum CreateTableState { /// Prepares to create the table Prepare, diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 8e250c398c..53e31c2a26 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -88,12 +88,14 @@ impl GreptimeDbClusterBuilder { pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); - let meta_srv = self.build_metasrv().await; + let datanode_clients = Arc::new(DatanodeClients::default()); + + let meta_srv = self.build_metasrv(datanode_clients.clone()).await; let (datanode_instances, heartbeat_tasks, storage_guards, wal_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; - let datanode_clients = build_datanode_clients(&datanode_instances, datanodes).await; + build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await; self.wait_datanodes_alive(&meta_srv.meta_srv.meta_peer_client(), datanodes) .await; @@ -115,8 +117,14 @@ impl GreptimeDbClusterBuilder { } } - async fn build_metasrv(&self) -> MockInfo { - meta_srv::mocks::mock(MetaSrvOptions::default(), self.kv_store.clone(), None).await + async fn build_metasrv(&self, datanode_clients: Arc) -> MockInfo { + meta_srv::mocks::mock( + MetaSrvOptions::default(), + self.kv_store.clone(), + None, + Some(datanode_clients), + ) + .await } async fn build_datanodes( @@ -218,6 +226,7 @@ impl GreptimeDbClusterBuilder { .enable_store() .enable_heartbeat() .channel_manager(meta_srv.channel_manager) + .enable_ddl() .build(); meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); @@ -238,10 +247,10 @@ impl GreptimeDbClusterBuilder { } async fn build_datanode_clients( + clients: Arc, instances: &HashMap>, datanodes: u32, -) -> Arc { - let clients = Arc::new(DatanodeClients::default()); +) { for i in 0..datanodes { let datanode_id = i as u64 + 1; let instance = instances.get(&datanode_id).cloned().unwrap(); @@ -250,7 +259,6 @@ async fn build_datanode_clients( .insert_client(Peer::new(datanode_id, addr), client) .await; } - clients } async fn create_datanode_client(datanode_instance: Arc) -> (String, Client) {