diff --git a/Cargo.lock b/Cargo.lock index 60dfd87f06..b4bbb6c105 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3894,7 +3894,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e152fcbf173b8759dd8a91ce7f6f4b0ca987828e#e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65c1364d8ee190a8d05cad5758d478b11eff2d35" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 526c6a46bf..ed8341d814 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65c1364d8ee190a8d05cad5758d478b11eff2d35" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index e1cf4c201d..8ef85a300f 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -28,6 +28,12 @@ pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> Strin format!("{catalog}.{schema}.{table}") } +/// Formats flow fully-qualified name +#[inline] +pub fn format_full_flow_name(catalog: &str, flow: &str) -> String { + format!("{catalog}.{flow}") +} + /// Build db name from catalog and schema string pub fn build_db_string(catalog: &str, schema: &str) -> String { if catalog == DEFAULT_CATALOG_NAME { diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 6f17221729..a9d61eed56 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -97,6 +97,11 @@ pub enum StatusCode { /// User is not authorized to perform the operation PermissionDenied = 7006, // ====== End of auth related status code ===== + + // ====== Begin of flow related status code ===== + FlowAlreadyExists = 8000, + FlowNotFound = 8001, + // ====== End of flow related status code ===== } impl StatusCode { @@ -125,8 +130,10 @@ impl StatusCode { | StatusCode::EngineExecuteQuery | StatusCode::TableAlreadyExists | StatusCode::TableNotFound - | StatusCode::RegionNotFound | StatusCode::RegionAlreadyExists + | StatusCode::RegionNotFound + | StatusCode::FlowAlreadyExists + | StatusCode::FlowNotFound | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists @@ -161,10 +168,12 @@ impl StatusCode { | StatusCode::InvalidSyntax | StatusCode::TableAlreadyExists | StatusCode::TableNotFound + | StatusCode::RegionAlreadyExists | StatusCode::RegionNotFound + | StatusCode::FlowAlreadyExists + | StatusCode::FlowNotFound | StatusCode::RegionNotReady | StatusCode::RegionBusy - | StatusCode::RegionAlreadyExists | StatusCode::RegionReadonly | StatusCode::TableColumnNotFound | StatusCode::TableColumnExists diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index c45afcc195..75da644fff 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -25,6 +25,7 @@ use crate::error::{ InvalidRoleSnafu, ParseNumSnafu, Result, }; use crate::peer::Peer; +use crate::ClusterId; const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info"; @@ -55,7 +56,7 @@ pub trait ClusterInfo { #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct NodeInfoKey { /// The cluster id. - pub cluster_id: u64, + pub cluster_id: ClusterId, /// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`. pub role: Role, /// The node id. @@ -67,7 +68,7 @@ impl NodeInfoKey { format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id) } - pub fn key_prefix_with_role(cluster_id: u64, role: Role) -> String { + pub fn key_prefix_with_role(cluster_id: ClusterId, role: Role) -> String { format!( "{}-{}-{}-", CLUSTER_NODE_INFO_PREFIX, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index bc4563b2f5..a922ce02d9 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -29,6 +29,7 @@ use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; +use crate::ClusterId; pub mod alter_logical_tables; pub mod alter_table; @@ -38,6 +39,7 @@ pub mod create_logical_tables; pub mod create_table; mod create_table_template; pub mod drop_database; +pub mod drop_flow; pub mod drop_table; pub mod flow_meta; mod physical_table_metadata; @@ -83,7 +85,7 @@ pub trait ProcedureExecutor: Send + Sync { pub type ProcedureExecutorRef = Arc; pub struct TableMetadataAllocatorContext { - pub cluster_id: u64, + pub cluster_id: ClusterId, } /// Metadata allocated to a table. diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 31a4fd4af1..209ed38122 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -45,9 +45,9 @@ use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; -use crate::metrics; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, find_leaders}; +use crate::{metrics, ClusterId}; /// The alter table procedure pub struct AlterTableProcedure { @@ -61,7 +61,7 @@ impl AlterTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable"; pub fn new( - cluster_id: u64, + cluster_id: ClusterId, table_id: TableId, task: AlterTableTask, context: DdlContext, @@ -269,7 +269,7 @@ enum AlterTableState { // The serialized data of alter table. #[derive(Debug, Serialize, Deserialize)] pub struct AlterTableData { - cluster_id: u64, + cluster_id: ClusterId, state: AlterTableState, task: AlterTableTask, table_id: TableId, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 20e46bfaa8..4678504aca 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod check; mod metadata; use std::collections::BTreeMap; @@ -20,6 +19,7 @@ use std::collections::BTreeMap; use api::v1::flow::flow_request::Body as PbFlowRequest; use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; use async_trait::async_trait; +use common_catalog::format_full_flow_name; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, @@ -29,15 +29,16 @@ use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use strum::AsRefStr; use table::metadata::TableId; use super::utils::add_peer_context_if_needed; use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; -use crate::error::Result; +use crate::error::{self, Result}; use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::table_name::TableNameKey; use crate::key::FlowId; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; @@ -68,8 +69,8 @@ impl CreateFlowProcedure { flow_id: None, peers: vec![], source_table_ids: vec![], - state: CreateFlowState::CreateMetadata, query_context, + state: CreateFlowState::Prepare, }, } } @@ -80,8 +81,49 @@ impl CreateFlowProcedure { Ok(CreateFlowProcedure { context, data }) } - async fn on_prepare(&mut self) -> Result { - self.check_creation().await?; + pub(crate) async fn on_prepare(&mut self) -> Result { + let catalog_name = &self.data.task.catalog_name; + let flow_name = &self.data.task.flow_name; + let sink_table_name = &self.data.task.sink_table_name; + let create_if_not_exists = self.data.task.create_if_not_exists; + + let flow_name_value = self + .context + .flow_metadata_manager + .flow_name_manager() + .get(catalog_name, flow_name) + .await?; + + if let Some(value) = flow_name_value { + ensure!( + create_if_not_exists, + error::FlowAlreadyExistsSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name), + } + ); + + let flow_id = value.flow_id(); + return Ok(Status::done_with_output(flow_id)); + } + + // Ensures sink table doesn't exist. + let exists = self + .context + .table_metadata_manager + .table_name_manager() + .exists(TableNameKey::new( + &sink_table_name.catalog_name, + &sink_table_name.schema_name, + &sink_table_name.table_name, + )) + .await?; + ensure!( + !exists, + error::TableAlreadyExistsSnafu { + table_name: sink_table_name.to_string(), + } + ); + self.collect_source_tables().await?; self.allocate_flow_id().await?; self.data.state = CreateFlowState::CreateFlows; @@ -207,7 +249,7 @@ impl From<&CreateFlowData> for CreateRequest { let source_table_ids = &value.source_table_ids; CreateRequest { - flow_id: Some(api::v1::flow::TaskId { id: flow_id }), + flow_id: Some(api::v1::FlowId { id: flow_id }), source_table_ids: source_table_ids .iter() .map(|table_id| api::v1::TableId { id: *table_id }) diff --git a/src/common/meta/src/ddl/create_flow/check.rs b/src/common/meta/src/ddl/create_flow/check.rs deleted file mode 100644 index 27d8107991..0000000000 --- a/src/common/meta/src/ddl/create_flow/check.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use snafu::ensure; - -use crate::ddl::create_flow::CreateFlowProcedure; -use crate::error::{self, Result}; -use crate::key::table_name::TableNameKey; - -impl CreateFlowProcedure { - /// Checks: - /// - The new task name doesn't exist. - /// - The sink table doesn't exist. - pub(crate) async fn check_creation(&self) -> Result<()> { - let catalog_name = &self.data.task.catalog_name; - let flow_name = &self.data.task.flow_name; - let sink_table_name = &self.data.task.sink_table_name; - - // Ensures the task name doesn't exist. - let exists = self - .context - .flow_metadata_manager - .flow_name_manager() - .exists(catalog_name, flow_name) - .await?; - ensure!( - !exists, - error::FlowAlreadyExistsSnafu { - flow_name: format!("{}.{}", catalog_name, flow_name), - } - ); - - // Ensures sink table doesn't exist. - let exists = self - .context - .table_metadata_manager - .table_name_manager() - .exists(TableNameKey::new( - &sink_table_name.catalog_name, - &sink_table_name.schema_name, - &sink_table_name.table_name, - )) - .await?; - ensure!( - !exists, - error::TableAlreadyExistsSnafu { - table_name: sink_table_name.to_string(), - } - ); - - Ok(()) - } -} diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 803a30e24e..d0b889609a 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -339,7 +339,7 @@ pub struct TableCreator { } impl TableCreator { - pub fn new(cluster_id: u64, task: CreateTableTask) -> Self { + pub fn new(cluster_id: ClusterId, task: CreateTableTask) -> Self { Self { data: CreateTableData { state: CreateTableState::Prepare, diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs new file mode 100644 index 0000000000..1a32781a9f --- /dev/null +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -0,0 +1,213 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod metadata; +use api::v1::flow::{flow_request, DropRequest, FlowRequest}; +use async_trait::async_trait; +use common_catalog::format_full_flow_name; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::info; +use futures::future::join_all; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use strum::AsRefStr; + +use super::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::DdlContext; +use crate::error::{self, Result}; +use crate::key::flow::flow_info::FlowInfoValue; +use crate::lock_key::{CatalogLock, FlowLock}; +use crate::peer::Peer; +use crate::rpc::ddl::DropFlowTask; +use crate::{metrics, ClusterId}; + +/// The procedure for dropping a flow. +pub struct DropFlowProcedure { + /// The context of procedure runtime. + pub(crate) context: DdlContext, + /// The serializable data. + pub(crate) data: DropFlowData, +} + +impl DropFlowProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure:DropFlow"; + + pub fn new(cluster_id: ClusterId, task: DropFlowTask, context: DdlContext) -> Self { + Self { + context, + data: DropFlowData { + state: DropFlowState::Prepare, + cluster_id, + task, + flow_info_value: None, + }, + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?; + + Ok(Self { context, data }) + } + + /// Checks whether flow exists. + /// - Early returns if flow not exists and `drop_if_exists` is `true`. + /// - Throws an error if flow not exists and `drop_if_exists` is `false`. + pub(crate) async fn on_prepare(&mut self) -> Result { + let catalog_name = &self.data.task.catalog_name; + let flow_name = &self.data.task.flow_name; + let exists = self + .context + .flow_metadata_manager + .flow_name_manager() + .exists(catalog_name, flow_name) + .await?; + + if !exists && self.data.task.drop_if_exists { + return Ok(Status::done()); + } + + ensure!( + exists, + error::FlowNotFoundSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name) + } + ); + + self.fill_flow_metadata().await?; + self.data.state = DropFlowState::DeleteMetadata; + Ok(Status::executing(true)) + } + + async fn on_flownode_drop_flows(&self) -> Result { + // Safety: checked + let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids; + let flow_id = self.data.task.flow_id; + let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len()); + + for flownode in flownode_ids.values() { + // TODO(weny): use the real peer. + let peer = Peer::new(*flownode, ""); + let requester = self.context.node_manager.flownode(&peer).await; + let request = FlowRequest { + body: Some(flow_request::Body::Drop(DropRequest { + flow_id: Some(api::v1::FlowId { id: flow_id }), + })), + ..Default::default() + }; + + drop_flow_tasks.push(async move { + if let Err(err) = requester.handle(request).await { + if err.status_code() != StatusCode::FlowNotFound { + return Err(add_peer_context_if_needed(peer)(err)); + } + } + Ok(()) + }); + } + join_all(drop_flow_tasks) + .await + .into_iter() + .collect::>>()?; + + Ok(Status::done()) + } + + async fn on_delete_metadata(&mut self) -> Result { + let flow_id = self.data.task.flow_id; + self.context + .flow_metadata_manager + .destroy_flow_metadata( + flow_id, + // Safety: checked + self.data.flow_info_value.as_ref().unwrap(), + ) + .await?; + info!("Deleted flow metadata for flow {flow_id}"); + self.data.state = DropFlowState::InvalidateFlowCache; + Ok(Status::executing(true)) + } + + async fn on_broadcast(&mut self) -> Result { + // TODO(weny): invalidates cache. + self.data.state = DropFlowState::DropFlows; + Ok(Status::executing(true)) + } +} + +#[async_trait] +impl Procedure for DropFlowProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match self.data.state { + DropFlowState::Prepare => self.on_prepare().await, + DropFlowState::DeleteMetadata => self.on_delete_metadata().await, + DropFlowState::InvalidateFlowCache => self.on_broadcast().await, + DropFlowState::DropFlows => self.on_flownode_drop_flows().await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let catalog_name = &self.data.task.catalog_name; + let flow_id = self.data.task.flow_id; + + let lock_key = vec![ + CatalogLock::Read(catalog_name).into(), + FlowLock::Write(flow_id).into(), + ]; + + LockKey::new(lock_key) + } +} + +/// The serializable data +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct DropFlowData { + state: DropFlowState, + cluster_id: ClusterId, + task: DropFlowTask, + pub(crate) flow_info_value: Option, +} + +/// The state of drop flow +#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)] +enum DropFlowState { + /// Prepares to drop the flow + Prepare, + /// Deletes metadata + DeleteMetadata, + /// Invalidate flow cache + InvalidateFlowCache, + /// Drop flows on flownode + DropFlows, + // TODO(weny): support to rollback +} diff --git a/src/common/meta/src/ddl/drop_flow/metadata.rs b/src/common/meta/src/ddl/drop_flow/metadata.rs new file mode 100644 index 0000000000..b20a259d91 --- /dev/null +++ b/src/common/meta/src/ddl/drop_flow/metadata.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_catalog::format_full_flow_name; +use snafu::OptionExt; + +use crate::ddl::drop_flow::DropFlowProcedure; +use crate::error::{self, Result}; + +impl DropFlowProcedure { + /// Fetches the flow info. + pub(crate) async fn fill_flow_metadata(&mut self) -> Result<()> { + let catalog_name = &self.data.task.catalog_name; + let flow_name = &self.data.task.flow_name; + let flow_info_value = self + .context + .flow_metadata_manager + .flow_info_manager() + .get(self.data.task.flow_id) + .await? + .with_context(|| error::FlowNotFoundSnafu { + flow_name: format_full_flow_name(catalog_name, flow_name), + })?; + self.data.flow_info_value = Some(flow_info_value); + + Ok(()) + } +} diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index dd597e54a8..f3840a7d67 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -36,10 +36,10 @@ use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; -use crate::metrics; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::ddl::DropTableTask; use crate::rpc::router::{operating_leader_regions, RegionRoute}; +use crate::{metrics, ClusterId}; pub struct DropTableProcedure { /// The context of procedure runtime. @@ -55,7 +55,7 @@ pub struct DropTableProcedure { impl DropTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; - pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self { + pub fn new(cluster_id: ClusterId, task: DropTableTask, context: DdlContext) -> Self { let data = DropTableData::new(cluster_id, task); let executor = data.build_executor(); Self { @@ -252,14 +252,14 @@ impl Procedure for DropTableProcedure { #[derive(Debug, Serialize, Deserialize)] pub struct DropTableData { pub state: DropTableState, - pub cluster_id: u64, + pub cluster_id: ClusterId, pub task: DropTableTask, pub physical_region_routes: Vec, pub physical_table_id: Option, } impl DropTableData { - pub fn new(cluster_id: u64, task: DropTableTask) -> Self { + pub fn new(cluster_id: ClusterId, task: DropTableTask) -> Self { Self { state: DropTableState::Prepare, cluster_id, diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 030d0a7b68..22a9203461 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -16,6 +16,7 @@ pub mod alter_table; pub mod columns; pub mod create_table; pub mod datanode_handler; +pub mod flownode_handler; use std::collections::HashMap; diff --git a/src/common/meta/src/ddl/test_util/flownode_handler.rs b/src/common/meta/src/ddl/test_util/flownode_handler.rs new file mode 100644 index 0000000000..357d7a7fda --- /dev/null +++ b/src/common/meta/src/ddl/test_util/flownode_handler.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::region::InsertRequests; +use common_telemetry::debug; + +use crate::error::Result; +use crate::peer::Peer; +use crate::test_util::MockFlownodeHandler; + +#[derive(Clone)] +pub struct NaiveFlownodeHandler; + +#[async_trait::async_trait] +impl MockFlownodeHandler for NaiveFlownodeHandler { + async fn handle(&self, peer: &Peer, request: FlowRequest) -> Result { + debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); + Ok(FlowResponse { + affected_rows: 0, + ..Default::default() + }) + } + + async fn handle_inserts( + &self, + _peer: &Peer, + _requests: InsertRequests, + ) -> Result { + unreachable!() + } +} diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index ff0261037c..46019d8c25 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -14,7 +14,9 @@ mod alter_logical_tables; mod alter_table; +mod create_flow; mod create_logical_tables; mod create_table; mod drop_database; +mod drop_flow; mod drop_table; diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs new file mode 100644 index 0000000000..415fc12f62 --- /dev/null +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::Arc; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure_test::execute_procedure_until_done; +use session::context::QueryContext; + +use crate::ddl::create_flow::CreateFlowProcedure; +use crate::ddl::test_util::create_table::test_create_table_task; +use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; +use crate::ddl::DdlContext; +use crate::key::table_route::TableRouteValue; +use crate::key::FlowId; +use crate::rpc::ddl::CreateFlowTask; +use crate::table_name::TableName; +use crate::test_util::{new_ddl_context, MockFlownodeManager}; +use crate::{error, ClusterId}; + +pub(crate) fn test_create_flow_task( + name: &str, + source_table_names: Vec, + sink_table_name: TableName, + create_if_not_exists: bool, +) -> CreateFlowTask { + CreateFlowTask { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + flow_name: name.to_string(), + source_table_names, + sink_table_name, + or_replace: false, + create_if_not_exists, + expire_when: "".to_string(), + comment: "".to_string(), + sql: "raw_sql".to_string(), + flow_options: Default::default(), + } +} + +#[tokio::test] +async fn test_create_flow_source_table_not_found() { + let cluster_id = 1; + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_table", + )]; + let sink_table_name = + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"); + let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let query_ctx = QueryContext::arc().into(); + let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::TableNotFound { .. }); +} + +pub(crate) async fn create_test_flow( + ddl_context: &DdlContext, + cluster_id: ClusterId, + flow_name: &str, + source_table_names: Vec, + sink_table_name: TableName, +) -> FlowId { + let task = test_create_flow_task( + flow_name, + source_table_names.clone(), + sink_table_name.clone(), + false, + ); + let query_ctx = QueryContext::arc().into(); + let mut procedure = + CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = output.downcast_ref::().unwrap(); + + *flow_id +} + +#[tokio::test] +async fn test_create_flow() { + let cluster_id = 1; + let table_id = 1024; + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_source_table", + )]; + let sink_table_name = + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let task = test_create_table_task("my_source_table", table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let flow_id = create_test_flow( + &ddl_context, + cluster_id, + "my_flow", + source_table_names.clone(), + sink_table_name.clone(), + ) + .await; + assert_eq!(flow_id, 1024); + + // Creates if not exists + let task = test_create_flow_task( + "my_flow", + source_table_names.clone(), + sink_table_name.clone(), + true, + ); + let query_ctx = QueryContext::arc().into(); + let mut procedure = + CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = output.downcast_ref::().unwrap(); + assert_eq!(*flow_id, 1024); + + // Creates again + let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); + let query_ctx = QueryContext::arc().into(); + let mut procedure = CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::FlowAlreadyExists { .. }); +} diff --git a/src/common/meta/src/ddl/tests/drop_flow.rs b/src/common/meta/src/ddl/tests/drop_flow.rs new file mode 100644 index 0000000000..b8b62b76cc --- /dev/null +++ b/src/common/meta/src/ddl/tests/drop_flow.rs @@ -0,0 +1,101 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::Arc; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure_test::execute_procedure_until_done; + +use crate::ddl::drop_flow::DropFlowProcedure; +use crate::ddl::test_util::create_table::test_create_table_task; +use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; +use crate::ddl::tests::create_flow::create_test_flow; +use crate::error; +use crate::key::table_route::TableRouteValue; +use crate::rpc::ddl::DropFlowTask; +use crate::table_name::TableName; +use crate::test_util::{new_ddl_context, MockFlownodeManager}; + +fn test_drop_flow_task(flow_name: &str, flow_id: u32, drop_if_exists: bool) -> DropFlowTask { + DropFlowTask { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + flow_name: flow_name.to_string(), + flow_id, + drop_if_exists, + } +} + +#[tokio::test] +async fn test_drop_flow_not_found() { + let cluster_id = 1; + let flow_id = 1024; + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let task = test_drop_flow_task("my_flow", flow_id, false); + let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::FlowNotFound { .. }); +} + +#[tokio::test] +async fn test_drop_flow() { + // create a flow + let cluster_id = 1; + let table_id = 1024; + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_source_table", + )]; + let sink_table_name = + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let task = test_create_table_task("my_source_table", table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let flow_id = create_test_flow( + &ddl_context, + cluster_id, + "my_flow", + source_table_names, + sink_table_name, + ) + .await; + // Drops the flows + let task = test_drop_flow_task("my_flow", flow_id, false); + let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context.clone()); + execute_procedure_until_done(&mut procedure).await; + + // Drops if not exists + let task = test_drop_flow_task("my_flow", flow_id, true); + let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context.clone()); + execute_procedure_until_done(&mut procedure).await; + + // Drops again + let task = test_drop_flow_task("my_flow", flow_id, false); + let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::FlowNotFound { .. }); +} diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index de0316de51..ce1341c0ad 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -38,10 +38,10 @@ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; -use crate::metrics; use crate::rpc::ddl::TruncateTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use crate::table_name::TableName; +use crate::{metrics, ClusterId}; pub struct TruncateTableProcedure { context: DdlContext, @@ -91,7 +91,7 @@ impl TruncateTableProcedure { pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable"; pub(crate) fn new( - cluster_id: u64, + cluster_id: ClusterId, task: TruncateTableTask, table_info_value: DeserializedValueWithBytes, region_routes: Vec, @@ -189,7 +189,7 @@ impl TruncateTableProcedure { #[derive(Debug, Serialize, Deserialize)] pub struct TruncateTableData { state: TruncateTableState, - cluster_id: u64, + cluster_id: ClusterId, task: TruncateTableTask, table_info_value: DeserializedValueWithBytes, region_routes: Vec, @@ -197,7 +197,7 @@ pub struct TruncateTableData { impl TruncateTableData { pub fn new( - cluster_id: u64, + cluster_id: ClusterId, task: TruncateTableTask, table_info_value: DeserializedValueWithBytes, region_routes: Vec, diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 50b7a7ddf1..1e4d953490 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -30,6 +30,7 @@ use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; +use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; @@ -152,6 +153,15 @@ impl DdlManager { }) }, ), + ( + DropFlowProcedure::TYPE_NAME, + &|context: DdlContext| -> BoxedProcedureLoader { + Box::new(move |json: &str| { + let context = context.clone(); + DropFlowProcedure::from_json(json, context).map(|p| Box::new(p) as _) + }) + }, + ), ( TruncateTableProcedure::TYPE_NAME, &|context: DdlContext| -> BoxedProcedureLoader { diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index cd75839671..cd6c092cc7 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -270,6 +270,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Flow not found: '{}'", flow_name))] + FlowNotFound { + flow_name: String, + location: Location, + }, + #[snafu(display("Schema nod found, schema: {}", table_schema))] SchemaNotFound { table_schema: String, @@ -511,10 +517,12 @@ impl ErrorExt for Error { | InvalidEngineType { .. } | AlterLogicalTablesInvalidArguments { .. } | CreateLogicalTablesInvalidArguments { .. } - | FlowAlreadyExists { .. } | MismatchPrefix { .. } | DelimiterNotFound { .. } => StatusCode::InvalidArguments, + FlowNotFound { .. } => StatusCode::FlowNotFound, + FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists, + TableNotFound { .. } => StatusCode::TableNotFound, TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 33db460271..1682922ab7 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -23,7 +23,10 @@ use std::sync::Arc; use common_telemetry::info; use snafu::{ensure, OptionExt}; -use self::flow_info::FlowInfoValue; +use self::flow_info::{FlowInfoKey, FlowInfoValue}; +use self::flow_name::FlowNameKey; +use self::flownode_flow::FlownodeFlowKey; +use self::table_flow::TableFlowKey; use crate::ensure_values; use crate::error::{self, Result}; use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef}; @@ -34,6 +37,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::{FlowId, MetaKey}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::rpc::store::BatchDeleteRequest; /// The key of `__flow/` scope. #[derive(Debug, PartialEq)] @@ -205,19 +209,66 @@ impl FlowMetadataManager { Ok(()) } + + fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec> { + let source_table_ids = flow_value.source_table_ids(); + let mut keys = + Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 1)); + /// Builds flow name key + let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name); + keys.push(flow_name.to_bytes()); + + /// Builds flow value key + let flow_info_key = FlowInfoKey::new(flow_id); + keys.push(flow_info_key.to_bytes()); + + /// Builds flownode flow keys & table flow keys + flow_value + .flownode_ids + .iter() + .for_each(|(&partition_id, &flownode_id)| { + keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes()); + + source_table_ids.iter().for_each(|&table_id| { + keys.push( + TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(), + ); + }) + }); + + keys + } + + /// Deletes metadata for table **permanently**. + pub async fn destroy_flow_metadata( + &self, + flow_id: FlowId, + flow_value: &FlowInfoValue, + ) -> Result<()> { + let keys = self.flow_metadata_keys(flow_id, flow_value); + let _ = self + .kv_backend + .batch_delete(BatchDeleteRequest::new().with_keys(keys)) + .await?; + Ok(()) + } } #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::BTreeMap; use std::sync::Arc; use futures::TryStreamExt; + use table::metadata::TableId; use super::*; use crate::key::flow::table_flow::TableFlowKey; + use crate::key::FlowPartitionId; use crate::kv_backend::memory::MemoryKvBackend; use crate::table_name::TableName; + use crate::FlownodeId; #[derive(Debug)] struct MockKey { @@ -258,28 +309,38 @@ mod tests { assert_matches!(err, error::Error::MismatchPrefix { .. }); } - #[tokio::test] - async fn test_create_flow_metadata() { - let mem_kv = Arc::new(MemoryKvBackend::default()); - let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); - let flow_id = 10; + fn test_flow_info_value( + flow_id: FlowId, + flow_name: &str, + flownode_ids: BTreeMap, + source_table_ids: Vec, + ) -> FlowInfoValue { let catalog_name = "greptime"; let sink_table_name = TableName { catalog_name: catalog_name.to_string(), schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_value = FlowInfoValue { + FlowInfoValue { catalog_name: catalog_name.to_string(), - flow_name: "flow".to_string(), - source_table_ids: vec![1024, 1025, 1026], + flow_name: flow_name.to_string(), + source_table_ids, sink_table_name, - flownode_ids: [(0, 1u64)].into(), + flownode_ids, raw_sql: "raw".to_string(), expire_when: "expr".to_string(), comment: "hi".to_string(), options: Default::default(), - }; + } + } + + #[tokio::test] + async fn test_create_flow_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; + let flow_value = + test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); flow_metadata_manager .create_flow_metadata(flow_id, flow_value.clone()) .await @@ -315,43 +376,18 @@ mod tests { } #[tokio::test] - async fn test_create_table_metadata_flow_exists_err() { + async fn test_create_flow_metadata_flow_exists_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv); let flow_id = 10; let catalog_name = "greptime"; - let sink_table_name = TableName { - catalog_name: catalog_name.to_string(), - schema_name: "my_schema".to_string(), - table_name: "sink_table".to_string(), - }; - let flow_value = FlowInfoValue { - catalog_name: "greptime".to_string(), - flow_name: "flow".to_string(), - source_table_ids: vec![1024, 1025, 1026], - sink_table_name: sink_table_name.clone(), - flownode_ids: [(0, 1u64)].into(), - raw_sql: "raw".to_string(), - expire_when: "expr".to_string(), - comment: "hi".to_string(), - options: Default::default(), - }; + let flow_value = + test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); flow_metadata_manager .create_flow_metadata(flow_id, flow_value.clone()) .await .unwrap(); - // Creates again. - let flow_value = FlowInfoValue { - catalog_name: catalog_name.to_string(), - flow_name: "flow".to_string(), - source_table_ids: vec![1024, 1025, 1026], - sink_table_name, - flownode_ids: [(0, 1u64)].into(), - raw_sql: "raw".to_string(), - expire_when: "expr".to_string(), - comment: "hi".to_string(), - options: Default::default(), - }; + // Creates again let err = flow_metadata_manager .create_flow_metadata(flow_id + 1, flow_value) .await @@ -360,27 +396,13 @@ mod tests { } #[tokio::test] - async fn test_create_table_metadata_unexpected_err() { + async fn test_create_flow_metadata_unexpected_err() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flow_metadata_manager = FlowMetadataManager::new(mem_kv); let flow_id = 10; let catalog_name = "greptime"; - let sink_table_name = TableName { - catalog_name: catalog_name.to_string(), - schema_name: "my_schema".to_string(), - table_name: "sink_table".to_string(), - }; - let flow_value = FlowInfoValue { - catalog_name: "greptime".to_string(), - flow_name: "flow".to_string(), - source_table_ids: vec![1024, 1025, 1026], - sink_table_name: sink_table_name.clone(), - flownode_ids: [(0, 1u64)].into(), - raw_sql: "raw".to_string(), - expire_when: "expr".to_string(), - comment: "hi".to_string(), - options: Default::default(), - }; + let flow_value = + test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); flow_metadata_manager .create_flow_metadata(flow_id, flow_value.clone()) .await @@ -408,4 +430,30 @@ mod tests { .unwrap_err(); assert!(err.to_string().contains("Reads the different value")); } + + #[tokio::test] + async fn test_destroy_flow_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + let flow_id = 10; + let catalog_name = "greptime"; + let flow_value = + test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]); + flow_metadata_manager + .create_flow_metadata(flow_id, flow_value.clone()) + .await + .unwrap(); + + flow_metadata_manager + .destroy_flow_metadata(flow_id, &flow_value) + .await + .unwrap(); + // Destroys again + flow_metadata_manager + .destroy_flow_metadata(flow_id, &flow_value) + .await + .unwrap(); + // Ensures all keys are deleted + assert!(mem_kv.is_empty()) + } } diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index 7fbc07655e..ea5e9b5d33 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -18,12 +18,15 @@ use common_catalog::{format_full_table_name, format_schema_name}; use common_procedure::StringKey; use store_api::storage::{RegionId, TableId}; +use crate::key::FlowId; + const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; +const FLOW_LOCK_PREFIX: &str = "__flow_lock"; /// [CatalogLock] acquires the lock on the tenant level. pub enum CatalogLock<'a> { @@ -199,6 +202,35 @@ impl From for StringKey { } } +/// [FlowLock] acquires the lock on the table level. +/// +/// Note: Allows to read/modify the corresponding flow's [FlowInfoValue](crate::key::flow::flow_info::FlowInfoValue), +/// [FlowNameValue](crate::key::flow::flow_name::FlowNameValue),[FlownodeFlowKey](crate::key::flow::flownode_flow::FlownodeFlowKey), +/// [TableFlowKey](crate::key::flow::table_flow::TableFlowKey). +pub enum FlowLock { + Read(FlowId), + Write(FlowId), +} + +impl Display for FlowLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let key = match self { + FlowLock::Read(s) => s, + FlowLock::Write(s) => s, + }; + write!(f, "{}/{}", FLOW_LOCK_PREFIX, key) + } +} + +impl From for StringKey { + fn from(value: FlowLock) -> Self { + match value { + FlowLock::Write(_) => StringKey::Exclusive(value.to_string()), + FlowLock::Read(_) => StringKey::Share(value.to_string()), + } + } +} + #[cfg(test)] mod tests { use common_procedure::StringKey; @@ -246,6 +278,12 @@ mod tests { string_key, StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz")) ); + // The flow name lock + let string_key: StringKey = FlowNameLock::new("foo", "baz").into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", FLOW_NAME_LOCK_PREFIX, "foo.baz")) + ); // The region lock let region_id = RegionId::new(1024, 1); let string_key: StringKey = RegionLock::Read(region_id).into(); @@ -258,5 +296,17 @@ mod tests { string_key, StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64())) ); + // The flow lock + let flow_id = 1024; + let string_key: StringKey = FlowLock::Read(flow_id).into(); + assert_eq!( + string_key, + StringKey::Share(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id)) + ); + let string_key: StringKey = FlowLock::Write(flow_id).into(); + assert_eq!( + string_key, + StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id)) + ); } } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 34bb95dc0c..a4c7feac39 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -45,6 +45,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_DROP_FLOW: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_drop_flow", + "meta procedure drop flow", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_tables", "meta procedure create tables", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 1493d864c0..b009a257e6 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -39,6 +39,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::error::{self, Result}; +use crate::key::FlowId; use crate::table_name::TableName; #[derive(Debug, Clone)] @@ -276,11 +277,11 @@ impl From for PbDdlTaskResponse { pid: Some(ProcedureId { key: val.key }), table_id: val .table_id - .map(|table_id| api::v1::meta::TableId { id: table_id }), + .map(|table_id| api::v1::TableId { id: table_id }), table_ids: val .table_ids .into_iter() - .map(|id| api::v1::meta::TableId { id }) + .map(|id| api::v1::TableId { id }) .collect(), ..Default::default() } @@ -818,9 +819,12 @@ impl From for PbCreateFlowTask { } /// Drop flow +#[derive(Debug, Serialize, Deserialize)] pub struct DropFlowTask { pub catalog_name: String, pub flow_name: String, + pub flow_id: FlowId, + pub drop_if_exists: bool, } impl TryFrom for DropFlowTask { @@ -830,12 +834,21 @@ impl TryFrom for DropFlowTask { let DropFlowExpr { catalog_name, flow_name, + flow_id, + drop_if_exists, } = pb.drop_flow.context(error::InvalidProtoMsgSnafu { - err_msg: "expected sink_table_name", + err_msg: "expected drop_flow", })?; + let flow_id = flow_id + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected flow_id", + })? + .id; Ok(DropFlowTask { catalog_name, flow_name, + flow_id, + drop_if_exists, }) } } @@ -845,12 +858,16 @@ impl From for PbDropFlowTask { DropFlowTask { catalog_name, flow_name, + flow_id, + drop_if_exists, }: DropFlowTask, ) -> Self { PbDropFlowTask { drop_flow: Some(DropFlowExpr { catalog_name, flow_name, + flow_id: Some(api::v1::FlowId { id: flow_id }), + drop_if_exists, }), } } diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 7c9ed13f5e..892f01da0c 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,7 +15,8 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::region::{InsertRequests, QueryRequest, RegionRequest}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -28,7 +29,9 @@ use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; -use crate::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef}; +use crate::node_manager::{ + Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, +}; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; @@ -45,7 +48,22 @@ pub trait MockDatanodeHandler: Sync + Send + Clone { ) -> Result; } -/// A mock struct implements [DatanodeManager]. +#[async_trait::async_trait] +pub trait MockFlownodeHandler: Sync + Send + Clone { + async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result { + unimplemented!() + } + + async fn handle_inserts( + &self, + _peer: &Peer, + _requests: InsertRequests, + ) -> Result { + unimplemented!() + } +} + +/// A mock struct implements [NodeManager] only implement the `datanode` method. #[derive(Clone)] pub struct MockDatanodeManager { handler: T, @@ -57,15 +75,27 @@ impl MockDatanodeManager { } } +/// A mock struct implements [NodeManager] only implement the `flownode` method. +#[derive(Clone)] +pub struct MockFlownodeManager { + handler: T, +} + +impl MockFlownodeManager { + pub fn new(handler: T) -> Self { + Self { handler } + } +} + /// A mock struct implements [Datanode]. #[derive(Clone)] -struct MockDatanode { +struct MockNode { peer: Peer, handler: T, } #[async_trait::async_trait] -impl Datanode for MockDatanode { +impl Datanode for MockNode { async fn handle(&self, request: RegionRequest) -> Result { self.handler.handle(&self.peer, request).await } @@ -78,7 +108,7 @@ impl Datanode for MockDatanode { #[async_trait::async_trait] impl NodeManager for MockDatanodeManager { async fn datanode(&self, peer: &Peer) -> DatanodeRef { - Arc::new(MockDatanode { + Arc::new(MockNode { peer: peer.clone(), handler: self.handler.clone(), }) @@ -89,6 +119,31 @@ impl NodeManager for MockDatanodeManager { } } +#[async_trait::async_trait] +impl Flownode for MockNode { + async fn handle(&self, request: FlowRequest) -> Result { + self.handler.handle(&self.peer, request).await + } + + async fn handle_inserts(&self, requests: InsertRequests) -> Result { + self.handler.handle_inserts(&self.peer, requests).await + } +} + +#[async_trait::async_trait] +impl NodeManager for MockFlownodeManager { + async fn datanode(&self, _peer: &Peer) -> DatanodeRef { + unimplemented!() + } + + async fn flownode(&self, peer: &Peer) -> FlownodeRef { + Arc::new(MockNode { + peer: peer.clone(), + handler: self.handler.clone(), + }) + } +} + /// Returns a test purpose [DdlContext]. pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 84d44fa273..9f7487aed3 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use async_trait::async_trait; use common_procedure::{ - Context, ContextProvider, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Result, - Status, + Context, ContextProvider, Output, Procedure, ProcedureId, ProcedureState, ProcedureWithId, + Result, Status, }; /// A Mock [ContextProvider]. @@ -47,7 +47,7 @@ impl ContextProvider for MockContextProvider { /// /// # Panics /// Panics if the `procedure` has subprocedure to execute. -pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) { +pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Option { let ctx = Context { procedure_id: ProcedureId::random(), provider: Arc::new(MockContextProvider::default()), @@ -60,7 +60,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) { subprocedures.is_empty(), "Executing subprocedure is unsupported" ), - Status::Done { .. } => break, + Status::Done { output } => return output, } } } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index c035a172e3..2c609d4a9b 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -41,6 +41,7 @@ use common_meta::rpc::store::{ BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; +use common_meta::ClusterId; use common_telemetry::info; use heartbeat::Client as HeartbeatClient; use lock::Client as LockClient; @@ -75,7 +76,7 @@ pub struct MetaClientBuilder { } impl MetaClientBuilder { - pub fn new(cluster_id: u64, member_id: u64, role: Role) -> Self { + pub fn new(cluster_id: ClusterId, member_id: u64, role: Role) -> Self { Self { id: (cluster_id, member_id), role, diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 3d9fe02e78..09a2f98213 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use api::v1::meta::HeartbeatRequest; +use common_meta::ClusterId; use common_time::util as time_util; use serde::{Deserialize, Serialize}; use store_api::region_engine::RegionRole; @@ -26,7 +27,7 @@ use crate::keys::StatKey; #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct Stat { pub timestamp_millis: i64, - pub cluster_id: u64, + pub cluster_id: ClusterId, // The datanode Id. pub id: u64, // The datanode address. diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 2e43219372..250f736c45 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -14,6 +14,7 @@ use std::str::FromStr; +use common_meta::ClusterId; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -41,7 +42,7 @@ lazy_static! { #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct LeaseKey { - pub cluster_id: u64, + pub cluster_id: ClusterId, pub node_id: u64, } @@ -132,7 +133,7 @@ impl TryFrom for Vec { #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub struct StatKey { - pub cluster_id: u64, + pub cluster_id: ClusterId, pub node_id: u64, } @@ -237,7 +238,7 @@ impl TryFrom> for StatValue { #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct InactiveRegionKey { - pub cluster_id: u64, + pub cluster_id: ClusterId, pub node_id: u64, pub region_id: u64, } diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index a1c42fe496..a1065d4cbb 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -19,7 +19,7 @@ use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::rpc::router::RegionRoute; -use common_meta::DatanodeId; +use common_meta::{ClusterId, DatanodeId}; use common_telemetry::warn; use snafu::ResultExt; use store_api::region_engine::RegionRole; @@ -167,7 +167,7 @@ impl RegionLeaseKeeper { /// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse]. pub async fn renew_region_leases( &self, - _cluster_id: u64, + _cluster_id: ClusterId, datanode_id: DatanodeId, regions: &[(RegionId, RegionRole)], ) -> Result { diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 04341b5ab0..f9ef72c65c 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -608,12 +608,14 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code { StatusCode::Cancelled => Code::Cancelled, StatusCode::TableAlreadyExists | StatusCode::TableColumnExists - | StatusCode::RegionAlreadyExists => Code::AlreadyExists, + | StatusCode::RegionAlreadyExists + | StatusCode::FlowAlreadyExists => Code::AlreadyExists, StatusCode::TableNotFound | StatusCode::RegionNotFound | StatusCode::TableColumnNotFound | StatusCode::DatabaseNotFound - | StatusCode::UserNotFound => Code::NotFound, + | StatusCode::UserNotFound + | StatusCode::FlowNotFound => Code::NotFound, StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable, StatusCode::RuntimeResourcesExhausted | StatusCode::RateLimited diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs index 6e063655f8..40f8cc80a3 100644 --- a/src/servers/src/http/error_result.rs +++ b/src/servers/src/http/error_result.rs @@ -96,7 +96,9 @@ impl IntoResponse for ErrorResponse { | StatusCode::DatabaseNotFound | StatusCode::TableNotFound | StatusCode::TableColumnNotFound - | StatusCode::PlanQuery => HttpStatusCode::BAD_REQUEST, + | StatusCode::PlanQuery + | StatusCode::FlowNotFound + | StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST, StatusCode::PermissionDenied | StatusCode::AuthHeaderNotFound