feat: implement drop flow procedure (#3877)

* feat: implement `destroy_flow_metadata` method

* chore: bump proto to 65c1364

* feat: implement the drop flow procedure

* feat: add `MockFlownodeManager`

* tests: add tests for create flow & drop flow procedure

* chore: apply suggestions from CR

* chore: use `ClusterId`
This commit is contained in:
Weny Xu
2024-05-09 17:23:19 +09:00
committed by GitHub
parent 5140d247e3
commit 5545a8b023
33 changed files with 916 additions and 171 deletions

2
Cargo.lock generated
View File

@@ -3894,7 +3894,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e152fcbf173b8759dd8a91ce7f6f4b0ca987828e#e152fcbf173b8759dd8a91ce7f6f4b0ca987828e"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65c1364d8ee190a8d05cad5758d478b11eff2d35"
dependencies = [
"prost 0.12.4",
"serde",

View File

@@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65c1364d8ee190a8d05cad5758d478b11eff2d35" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -28,6 +28,12 @@ pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> Strin
format!("{catalog}.{schema}.{table}")
}
/// Formats flow fully-qualified name
#[inline]
pub fn format_full_flow_name(catalog: &str, flow: &str) -> String {
format!("{catalog}.{flow}")
}
/// Build db name from catalog and schema string
pub fn build_db_string(catalog: &str, schema: &str) -> String {
if catalog == DEFAULT_CATALOG_NAME {

View File

@@ -97,6 +97,11 @@ pub enum StatusCode {
/// User is not authorized to perform the operation
PermissionDenied = 7006,
// ====== End of auth related status code =====
// ====== Begin of flow related status code =====
FlowAlreadyExists = 8000,
FlowNotFound = 8001,
// ====== End of flow related status code =====
}
impl StatusCode {
@@ -125,8 +130,10 @@ impl StatusCode {
| StatusCode::EngineExecuteQuery
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::FlowAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
@@ -161,10 +168,12 @@ impl StatusCode {
| StatusCode::InvalidSyntax
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::RegionAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::FlowAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::RegionNotReady
| StatusCode::RegionBusy
| StatusCode::RegionAlreadyExists
| StatusCode::RegionReadonly
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists

View File

@@ -25,6 +25,7 @@ use crate::error::{
InvalidRoleSnafu, ParseNumSnafu, Result,
};
use crate::peer::Peer;
use crate::ClusterId;
const CLUSTER_NODE_INFO_PREFIX: &str = "__meta_cluster_node_info";
@@ -55,7 +56,7 @@ pub trait ClusterInfo {
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: u64,
pub cluster_id: ClusterId,
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
/// The node id.
@@ -67,7 +68,7 @@ impl NodeInfoKey {
format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id)
}
pub fn key_prefix_with_role(cluster_id: u64, role: Role) -> String {
pub fn key_prefix_with_role(cluster_id: ClusterId, role: Role) -> String {
format!(
"{}-{}-{}-",
CLUSTER_NODE_INFO_PREFIX,

View File

@@ -29,6 +29,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::ClusterId;
pub mod alter_logical_tables;
pub mod alter_table;
@@ -38,6 +39,7 @@ pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_flow;
pub mod drop_table;
pub mod flow_meta;
mod physical_table_metadata;
@@ -83,7 +85,7 @@ pub trait ProcedureExecutor: Send + Sync {
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,
pub cluster_id: ClusterId,
}
/// Metadata allocated to a table.

View File

@@ -45,9 +45,9 @@ use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::{metrics, ClusterId};
/// The alter table procedure
pub struct AlterTableProcedure {
@@ -61,7 +61,7 @@ impl AlterTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::AlterTable";
pub fn new(
cluster_id: u64,
cluster_id: ClusterId,
table_id: TableId,
task: AlterTableTask,
context: DdlContext,
@@ -269,7 +269,7 @@ enum AlterTableState {
// The serialized data of alter table.
#[derive(Debug, Serialize, Deserialize)]
pub struct AlterTableData {
cluster_id: u64,
cluster_id: ClusterId,
state: AlterTableState,
task: AlterTableTask,
table_id: TableId,

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod check;
mod metadata;
use std::collections::BTreeMap;
@@ -20,6 +19,7 @@ use std::collections::BTreeMap;
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
@@ -29,15 +29,16 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use table::metadata::TableId;
use super::utils::add_peer_context_if_needed;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
@@ -68,8 +69,8 @@ impl CreateFlowProcedure {
flow_id: None,
peers: vec![],
source_table_ids: vec![],
state: CreateFlowState::CreateMetadata,
query_context,
state: CreateFlowState::Prepare,
},
}
}
@@ -80,8 +81,49 @@ impl CreateFlowProcedure {
Ok(CreateFlowProcedure { context, data })
}
async fn on_prepare(&mut self) -> Result<Status> {
self.check_creation().await?;
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let catalog_name = &self.data.task.catalog_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
let create_if_not_exists = self.data.task.create_if_not_exists;
let flow_name_value = self
.context
.flow_metadata_manager
.flow_name_manager()
.get(catalog_name, flow_name)
.await?;
if let Some(value) = flow_name_value {
ensure!(
create_if_not_exists,
error::FlowAlreadyExistsSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
);
let flow_id = value.flow_id();
return Ok(Status::done_with_output(flow_id));
}
// Ensures sink table doesn't exist.
let exists = self
.context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
&sink_table_name.catalog_name,
&sink_table_name.schema_name,
&sink_table_name.table_name,
))
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: sink_table_name.to_string(),
}
);
self.collect_source_tables().await?;
self.allocate_flow_id().await?;
self.data.state = CreateFlowState::CreateFlows;
@@ -207,7 +249,7 @@ impl From<&CreateFlowData> for CreateRequest {
let source_table_ids = &value.source_table_ids;
CreateRequest {
flow_id: Some(api::v1::flow::TaskId { id: flow_id }),
flow_id: Some(api::v1::FlowId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })

View File

@@ -1,64 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::ensure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
impl CreateFlowProcedure {
/// Checks:
/// - The new task name doesn't exist.
/// - The sink table doesn't exist.
pub(crate) async fn check_creation(&self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;
// Ensures the task name doesn't exist.
let exists = self
.context
.flow_metadata_manager
.flow_name_manager()
.exists(catalog_name, flow_name)
.await?;
ensure!(
!exists,
error::FlowAlreadyExistsSnafu {
flow_name: format!("{}.{}", catalog_name, flow_name),
}
);
// Ensures sink table doesn't exist.
let exists = self
.context
.table_metadata_manager
.table_name_manager()
.exists(TableNameKey::new(
&sink_table_name.catalog_name,
&sink_table_name.schema_name,
&sink_table_name.table_name,
))
.await?;
ensure!(
!exists,
error::TableAlreadyExistsSnafu {
table_name: sink_table_name.to_string(),
}
);
Ok(())
}
}

View File

@@ -339,7 +339,7 @@ pub struct TableCreator {
}
impl TableCreator {
pub fn new(cluster_id: u64, task: CreateTableTask) -> Self {
pub fn new(cluster_id: ClusterId, task: CreateTableTask) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,

View File

@@ -0,0 +1,213 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod metadata;
use api::v1::flow::{flow_request, DropRequest, FlowRequest};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::info;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use strum::AsRefStr;
use super::utils::{add_peer_context_if_needed, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::lock_key::{CatalogLock, FlowLock};
use crate::peer::Peer;
use crate::rpc::ddl::DropFlowTask;
use crate::{metrics, ClusterId};
/// The procedure for dropping a flow.
pub struct DropFlowProcedure {
/// The context of procedure runtime.
pub(crate) context: DdlContext,
/// The serializable data.
pub(crate) data: DropFlowData,
}
impl DropFlowProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure:DropFlow";
pub fn new(cluster_id: ClusterId, task: DropFlowTask, context: DdlContext) -> Self {
Self {
context,
data: DropFlowData {
state: DropFlowState::Prepare,
cluster_id,
task,
flow_info_value: None,
},
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropFlowData = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
/// Checks whether flow exists.
/// - Early returns if flow not exists and `drop_if_exists` is `true`.
/// - Throws an error if flow not exists and `drop_if_exists` is `false`.
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let catalog_name = &self.data.task.catalog_name;
let flow_name = &self.data.task.flow_name;
let exists = self
.context
.flow_metadata_manager
.flow_name_manager()
.exists(catalog_name, flow_name)
.await?;
if !exists && self.data.task.drop_if_exists {
return Ok(Status::done());
}
ensure!(
exists,
error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name)
}
);
self.fill_flow_metadata().await?;
self.data.state = DropFlowState::DeleteMetadata;
Ok(Status::executing(true))
}
async fn on_flownode_drop_flows(&self) -> Result<Status> {
// Safety: checked
let flownode_ids = &self.data.flow_info_value.as_ref().unwrap().flownode_ids;
let flow_id = self.data.task.flow_id;
let mut drop_flow_tasks = Vec::with_capacity(flownode_ids.len());
for flownode in flownode_ids.values() {
// TODO(weny): use the real peer.
let peer = Peer::new(*flownode, "");
let requester = self.context.node_manager.flownode(&peer).await;
let request = FlowRequest {
body: Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
})),
..Default::default()
};
drop_flow_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::FlowNotFound {
return Err(add_peer_context_if_needed(peer)(err));
}
}
Ok(())
});
}
join_all(drop_flow_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(Status::done())
}
async fn on_delete_metadata(&mut self) -> Result<Status> {
let flow_id = self.data.task.flow_id;
self.context
.flow_metadata_manager
.destroy_flow_metadata(
flow_id,
// Safety: checked
self.data.flow_info_value.as_ref().unwrap(),
)
.await?;
info!("Deleted flow metadata for flow {flow_id}");
self.data.state = DropFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}
async fn on_broadcast(&mut self) -> Result<Status> {
// TODO(weny): invalidates cache.
self.data.state = DropFlowState::DropFlows;
Ok(Status::executing(true))
}
}
#[async_trait]
impl Procedure for DropFlowProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_FLOW
.with_label_values(&[state.as_ref()])
.start_timer();
match self.data.state {
DropFlowState::Prepare => self.on_prepare().await,
DropFlowState::DeleteMetadata => self.on_delete_metadata().await,
DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
}
.map_err(handle_retry_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let flow_id = self.data.task.flow_id;
let lock_key = vec![
CatalogLock::Read(catalog_name).into(),
FlowLock::Write(flow_id).into(),
];
LockKey::new(lock_key)
}
}
/// The serializable data
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropFlowData {
state: DropFlowState,
cluster_id: ClusterId,
task: DropFlowTask,
pub(crate) flow_info_value: Option<FlowInfoValue>,
}
/// The state of drop flow
#[derive(Debug, Serialize, Deserialize, AsRefStr, PartialEq)]
enum DropFlowState {
/// Prepares to drop the flow
Prepare,
/// Deletes metadata
DeleteMetadata,
/// Invalidate flow cache
InvalidateFlowCache,
/// Drop flows on flownode
DropFlows,
// TODO(weny): support to rollback
}

View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_catalog::format_full_flow_name;
use snafu::OptionExt;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::error::{self, Result};
impl DropFlowProcedure {
/// Fetches the flow info.
pub(crate) async fn fill_flow_metadata(&mut self) -> Result<()> {
let catalog_name = &self.data.task.catalog_name;
let flow_name = &self.data.task.flow_name;
let flow_info_value = self
.context
.flow_metadata_manager
.flow_info_manager()
.get(self.data.task.flow_id)
.await?
.with_context(|| error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
})?;
self.data.flow_info_value = Some(flow_info_value);
Ok(())
}
}

View File

@@ -36,10 +36,10 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::DropTableTask;
use crate::rpc::router::{operating_leader_regions, RegionRoute};
use crate::{metrics, ClusterId};
pub struct DropTableProcedure {
/// The context of procedure runtime.
@@ -55,7 +55,7 @@ pub struct DropTableProcedure {
impl DropTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable";
pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self {
pub fn new(cluster_id: ClusterId, task: DropTableTask, context: DdlContext) -> Self {
let data = DropTableData::new(cluster_id, task);
let executor = data.build_executor();
Self {
@@ -252,14 +252,14 @@ impl Procedure for DropTableProcedure {
#[derive(Debug, Serialize, Deserialize)]
pub struct DropTableData {
pub state: DropTableState,
pub cluster_id: u64,
pub cluster_id: ClusterId,
pub task: DropTableTask,
pub physical_region_routes: Vec<RegionRoute>,
pub physical_table_id: Option<TableId>,
}
impl DropTableData {
pub fn new(cluster_id: u64, task: DropTableTask) -> Self {
pub fn new(cluster_id: ClusterId, task: DropTableTask) -> Self {
Self {
state: DropTableState::Prepare,
cluster_id,

View File

@@ -16,6 +16,7 @@ pub mod alter_table;
pub mod columns;
pub mod create_table;
pub mod datanode_handler;
pub mod flownode_handler;
use std::collections::HashMap;

View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_telemetry::debug;
use crate::error::Result;
use crate::peer::Peer;
use crate::test_util::MockFlownodeHandler;
#[derive(Clone)]
pub struct NaiveFlownodeHandler;
#[async_trait::async_trait]
impl MockFlownodeHandler for NaiveFlownodeHandler {
async fn handle(&self, peer: &Peer, request: FlowRequest) -> Result<FlowResponse> {
debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
Ok(FlowResponse {
affected_rows: 0,
..Default::default()
})
}
async fn handle_inserts(
&self,
_peer: &Peer,
_requests: InsertRequests,
) -> Result<FlowResponse> {
unreachable!()
}
}

View File

@@ -14,7 +14,9 @@
mod alter_logical_tables;
mod alter_table;
mod create_flow;
mod create_logical_tables;
mod create_table;
mod drop_database;
mod drop_flow;
mod drop_table;

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure_test::execute_procedure_until_done;
use session::context::QueryContext;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
use crate::ddl::DdlContext;
use crate::key::table_route::TableRouteValue;
use crate::key::FlowId;
use crate::rpc::ddl::CreateFlowTask;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockFlownodeManager};
use crate::{error, ClusterId};
pub(crate) fn test_create_flow_task(
name: &str,
source_table_names: Vec<TableName>,
sink_table_name: TableName,
create_if_not_exists: bool,
) -> CreateFlowTask {
CreateFlowTask {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
flow_name: name.to_string(),
source_table_names,
sink_table_name,
or_replace: false,
create_if_not_exists,
expire_when: "".to_string(),
comment: "".to_string(),
sql: "raw_sql".to_string(),
flow_options: Default::default(),
}
}
#[tokio::test]
async fn test_create_flow_source_table_not_found() {
let cluster_id = 1;
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"my_table",
)];
let sink_table_name =
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table");
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let query_ctx = QueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(cluster_id, task, query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::TableNotFound { .. });
}
pub(crate) async fn create_test_flow(
ddl_context: &DdlContext,
cluster_id: ClusterId,
flow_name: &str,
source_table_names: Vec<TableName>,
sink_table_name: TableName,
) -> FlowId {
let task = test_create_flow_task(
flow_name,
source_table_names.clone(),
sink_table_name.clone(),
false,
);
let query_ctx = QueryContext::arc().into();
let mut procedure =
CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();
*flow_id
}
#[tokio::test]
async fn test_create_flow() {
let cluster_id = 1;
let table_id = 1024;
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"my_source_table",
)];
let sink_table_name =
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table");
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("my_source_table", table_id);
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let flow_id = create_test_flow(
&ddl_context,
cluster_id,
"my_flow",
source_table_names.clone(),
sink_table_name.clone(),
)
.await;
assert_eq!(flow_id, 1024);
// Creates if not exists
let task = test_create_flow_task(
"my_flow",
source_table_names.clone(),
sink_table_name.clone(),
true,
);
let query_ctx = QueryContext::arc().into();
let mut procedure =
CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();
assert_eq!(*flow_id, 1024);
// Creates again
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let query_ctx = QueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(cluster_id, task.clone(), query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
}

View File

@@ -0,0 +1,101 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure_test::execute_procedure_until_done;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
use crate::ddl::tests::create_flow::create_test_flow;
use crate::error;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::DropFlowTask;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockFlownodeManager};
fn test_drop_flow_task(flow_name: &str, flow_id: u32, drop_if_exists: bool) -> DropFlowTask {
DropFlowTask {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
flow_name: flow_name.to_string(),
flow_id,
drop_if_exists,
}
}
#[tokio::test]
async fn test_drop_flow_not_found() {
let cluster_id = 1;
let flow_id = 1024;
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let task = test_drop_flow_task("my_flow", flow_id, false);
let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::FlowNotFound { .. });
}
#[tokio::test]
async fn test_drop_flow() {
// create a flow
let cluster_id = 1;
let table_id = 1024;
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"my_source_table",
)];
let sink_table_name =
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table");
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let task = test_create_table_task("my_source_table", table_id);
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let flow_id = create_test_flow(
&ddl_context,
cluster_id,
"my_flow",
source_table_names,
sink_table_name,
)
.await;
// Drops the flows
let task = test_drop_flow_task("my_flow", flow_id, false);
let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until_done(&mut procedure).await;
// Drops if not exists
let task = test_drop_flow_task("my_flow", flow_id, true);
let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until_done(&mut procedure).await;
// Drops again
let task = test_drop_flow_task("my_flow", flow_id, false);
let mut procedure = DropFlowProcedure::new(cluster_id, task, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::FlowNotFound { .. });
}

View File

@@ -38,10 +38,10 @@ use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::metrics;
use crate::rpc::ddl::TruncateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;
use crate::{metrics, ClusterId};
pub struct TruncateTableProcedure {
context: DdlContext,
@@ -91,7 +91,7 @@ impl TruncateTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTable";
pub(crate) fn new(
cluster_id: u64,
cluster_id: ClusterId,
task: TruncateTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
@@ -189,7 +189,7 @@ impl TruncateTableProcedure {
#[derive(Debug, Serialize, Deserialize)]
pub struct TruncateTableData {
state: TruncateTableState,
cluster_id: u64,
cluster_id: ClusterId,
task: TruncateTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,
@@ -197,7 +197,7 @@ pub struct TruncateTableData {
impl TruncateTableData {
pub fn new(
cluster_id: u64,
cluster_id: ClusterId,
task: TruncateTableTask,
table_info_value: DeserializedValueWithBytes<TableInfoValue>,
region_routes: Vec<RegionRoute>,

View File

@@ -30,6 +30,7 @@ use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
@@ -152,6 +153,15 @@ impl DdlManager {
})
},
),
(
DropFlowProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {
Box::new(move |json: &str| {
let context = context.clone();
DropFlowProcedure::from_json(json, context).map(|p| Box::new(p) as _)
})
},
),
(
TruncateTableProcedure::TYPE_NAME,
&|context: DdlContext| -> BoxedProcedureLoader {

View File

@@ -270,6 +270,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Flow not found: '{}'", flow_name))]
FlowNotFound {
flow_name: String,
location: Location,
},
#[snafu(display("Schema nod found, schema: {}", table_schema))]
SchemaNotFound {
table_schema: String,
@@ -511,10 +517,12 @@ impl ErrorExt for Error {
| InvalidEngineType { .. }
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| FlowAlreadyExists { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,

View File

@@ -23,7 +23,10 @@ use std::sync::Arc;
use common_telemetry::info;
use snafu::{ensure, OptionExt};
use self::flow_info::FlowInfoValue;
use self::flow_info::{FlowInfoKey, FlowInfoValue};
use self::flow_name::FlowNameKey;
use self::flownode_flow::FlownodeFlowKey;
use self::table_flow::TableFlowKey;
use crate::ensure_values;
use crate::error::{self, Result};
use crate::key::flow::flow_info::{FlowInfoManager, FlowInfoManagerRef};
@@ -34,6 +37,7 @@ use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{FlowId, MetaKey};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchDeleteRequest;
/// The key of `__flow/` scope.
#[derive(Debug, PartialEq)]
@@ -205,19 +209,66 @@ impl FlowMetadataManager {
Ok(())
}
fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
let source_table_ids = flow_value.source_table_ids();
let mut keys =
Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 1));
/// Builds flow name key
let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
keys.push(flow_name.to_bytes());
/// Builds flow value key
let flow_info_key = FlowInfoKey::new(flow_id);
keys.push(flow_info_key.to_bytes());
/// Builds flownode flow keys & table flow keys
flow_value
.flownode_ids
.iter()
.for_each(|(&partition_id, &flownode_id)| {
keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
source_table_ids.iter().for_each(|&table_id| {
keys.push(
TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
);
})
});
keys
}
/// Deletes metadata for table **permanently**.
pub async fn destroy_flow_metadata(
&self,
flow_id: FlowId,
flow_value: &FlowInfoValue,
) -> Result<()> {
let keys = self.flow_metadata_keys(flow_id, flow_value);
let _ = self
.kv_backend
.batch_delete(BatchDeleteRequest::new().with_keys(keys))
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::BTreeMap;
use std::sync::Arc;
use futures::TryStreamExt;
use table::metadata::TableId;
use super::*;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::FlowPartitionId;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::table_name::TableName;
use crate::FlownodeId;
#[derive(Debug)]
struct MockKey {
@@ -258,28 +309,38 @@ mod tests {
assert_matches!(err, error::Error::MismatchPrefix { .. });
}
#[tokio::test]
async fn test_create_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
fn test_flow_info_value(
flow_id: FlowId,
flow_name: &str,
flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
source_table_ids: Vec<TableId>,
) -> FlowInfoValue {
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_value = FlowInfoValue {
FlowInfoValue {
catalog_name: catalog_name.to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
flow_name: flow_name.to_string(),
source_table_ids,
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
flownode_ids,
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
}
}
#[tokio::test]
async fn test_create_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let flow_value =
test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone())
.await
@@ -315,43 +376,18 @@ mod tests {
}
#[tokio::test]
async fn test_create_table_metadata_flow_exists_err() {
async fn test_create_flow_metadata_flow_exists_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
let flow_value =
test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
// Creates again.
let flow_value = FlowInfoValue {
catalog_name: catalog_name.to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
// Creates again
let err = flow_metadata_manager
.create_flow_metadata(flow_id + 1, flow_value)
.await
@@ -360,27 +396,13 @@ mod tests {
}
#[tokio::test]
async fn test_create_table_metadata_unexpected_err() {
async fn test_create_flow_metadata_unexpected_err() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
let flow_id = 10;
let catalog_name = "greptime";
let sink_table_name = TableName {
catalog_name: catalog_name.to_string(),
schema_name: "my_schema".to_string(),
table_name: "sink_table".to_string(),
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: sink_table_name.clone(),
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
comment: "hi".to_string(),
options: Default::default(),
};
let flow_value =
test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone())
.await
@@ -408,4 +430,30 @@ mod tests {
.unwrap_err();
assert!(err.to_string().contains("Reads the different value"));
}
#[tokio::test]
async fn test_destroy_flow_metadata() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
let flow_id = 10;
let catalog_name = "greptime";
let flow_value =
test_flow_info_value(flow_id, "flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
flow_metadata_manager
.create_flow_metadata(flow_id, flow_value.clone())
.await
.unwrap();
flow_metadata_manager
.destroy_flow_metadata(flow_id, &flow_value)
.await
.unwrap();
// Destroys again
flow_metadata_manager
.destroy_flow_metadata(flow_id, &flow_value)
.await
.unwrap();
// Ensures all keys are deleted
assert!(mem_kv.is_empty())
}
}

View File

@@ -18,12 +18,15 @@ use common_catalog::{format_full_table_name, format_schema_name};
use common_procedure::StringKey;
use store_api::storage::{RegionId, TableId};
use crate::key::FlowId;
const CATALOG_LOCK_PREFIX: &str = "__catalog_lock";
const SCHEMA_LOCK_PREFIX: &str = "__schema_lock";
const TABLE_LOCK_PREFIX: &str = "__table_lock";
const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock";
const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock";
const REGION_LOCK_PREFIX: &str = "__region_lock";
const FLOW_LOCK_PREFIX: &str = "__flow_lock";
/// [CatalogLock] acquires the lock on the tenant level.
pub enum CatalogLock<'a> {
@@ -199,6 +202,35 @@ impl From<RegionLock> for StringKey {
}
}
/// [FlowLock] acquires the lock on the table level.
///
/// Note: Allows to read/modify the corresponding flow's [FlowInfoValue](crate::key::flow::flow_info::FlowInfoValue),
/// [FlowNameValue](crate::key::flow::flow_name::FlowNameValue),[FlownodeFlowKey](crate::key::flow::flownode_flow::FlownodeFlowKey),
/// [TableFlowKey](crate::key::flow::table_flow::TableFlowKey).
pub enum FlowLock {
Read(FlowId),
Write(FlowId),
}
impl Display for FlowLock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let key = match self {
FlowLock::Read(s) => s,
FlowLock::Write(s) => s,
};
write!(f, "{}/{}", FLOW_LOCK_PREFIX, key)
}
}
impl From<FlowLock> for StringKey {
fn from(value: FlowLock) -> Self {
match value {
FlowLock::Write(_) => StringKey::Exclusive(value.to_string()),
FlowLock::Read(_) => StringKey::Share(value.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use common_procedure::StringKey;
@@ -246,6 +278,12 @@ mod tests {
string_key,
StringKey::Exclusive(format!("{}/{}", TABLE_NAME_LOCK_PREFIX, "foo.bar.baz"))
);
// The flow name lock
let string_key: StringKey = FlowNameLock::new("foo", "baz").into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", FLOW_NAME_LOCK_PREFIX, "foo.baz"))
);
// The region lock
let region_id = RegionId::new(1024, 1);
let string_key: StringKey = RegionLock::Read(region_id).into();
@@ -258,5 +296,17 @@ mod tests {
string_key,
StringKey::Exclusive(format!("{}/{}", REGION_LOCK_PREFIX, region_id.as_u64()))
);
// The flow lock
let flow_id = 1024;
let string_key: StringKey = FlowLock::Read(flow_id).into();
assert_eq!(
string_key,
StringKey::Share(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
);
let string_key: StringKey = FlowLock::Write(flow_id).into();
assert_eq!(
string_key,
StringKey::Exclusive(format!("{}/{}", FLOW_LOCK_PREFIX, flow_id))
);
}
}

View File

@@ -45,6 +45,12 @@ lazy_static! {
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_DROP_FLOW: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_drop_flow",
"meta procedure drop flow",
&["step"]
)
.unwrap();
pub static ref METRIC_META_PROCEDURE_CREATE_TABLES: HistogramVec = register_histogram_vec!(
"greptime_meta_procedure_create_tables",
"meta procedure create tables",

View File

@@ -39,6 +39,7 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::error::{self, Result};
use crate::key::FlowId;
use crate::table_name::TableName;
#[derive(Debug, Clone)]
@@ -276,11 +277,11 @@ impl From<SubmitDdlTaskResponse> for PbDdlTaskResponse {
pid: Some(ProcedureId { key: val.key }),
table_id: val
.table_id
.map(|table_id| api::v1::meta::TableId { id: table_id }),
.map(|table_id| api::v1::TableId { id: table_id }),
table_ids: val
.table_ids
.into_iter()
.map(|id| api::v1::meta::TableId { id })
.map(|id| api::v1::TableId { id })
.collect(),
..Default::default()
}
@@ -818,9 +819,12 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
}
/// Drop flow
#[derive(Debug, Serialize, Deserialize)]
pub struct DropFlowTask {
pub catalog_name: String,
pub flow_name: String,
pub flow_id: FlowId,
pub drop_if_exists: bool,
}
impl TryFrom<PbDropFlowTask> for DropFlowTask {
@@ -830,12 +834,21 @@ impl TryFrom<PbDropFlowTask> for DropFlowTask {
let DropFlowExpr {
catalog_name,
flow_name,
flow_id,
drop_if_exists,
} = pb.drop_flow.context(error::InvalidProtoMsgSnafu {
err_msg: "expected sink_table_name",
err_msg: "expected drop_flow",
})?;
let flow_id = flow_id
.context(error::InvalidProtoMsgSnafu {
err_msg: "expected flow_id",
})?
.id;
Ok(DropFlowTask {
catalog_name,
flow_name,
flow_id,
drop_if_exists,
})
}
}
@@ -845,12 +858,16 @@ impl From<DropFlowTask> for PbDropFlowTask {
DropFlowTask {
catalog_name,
flow_name,
flow_id,
drop_if_exists,
}: DropFlowTask,
) -> Self {
PbDropFlowTask {
drop_flow: Some(DropFlowExpr {
catalog_name,
flow_name,
flow_id: Some(api::v1::FlowId { id: flow_id }),
drop_if_exists,
}),
}
}

View File

@@ -15,7 +15,8 @@
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
@@ -28,7 +29,9 @@ use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef};
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
@@ -45,7 +48,22 @@ pub trait MockDatanodeHandler: Sync + Send + Clone {
) -> Result<SendableRecordBatchStream>;
}
/// A mock struct implements [DatanodeManager].
#[async_trait::async_trait]
pub trait MockFlownodeHandler: Sync + Send + Clone {
async fn handle(&self, _peer: &Peer, _request: FlowRequest) -> Result<FlowResponse> {
unimplemented!()
}
async fn handle_inserts(
&self,
_peer: &Peer,
_requests: InsertRequests,
) -> Result<FlowResponse> {
unimplemented!()
}
}
/// A mock struct implements [NodeManager] only implement the `datanode` method.
#[derive(Clone)]
pub struct MockDatanodeManager<T> {
handler: T,
@@ -57,15 +75,27 @@ impl<T> MockDatanodeManager<T> {
}
}
/// A mock struct implements [NodeManager] only implement the `flownode` method.
#[derive(Clone)]
pub struct MockFlownodeManager<T> {
handler: T,
}
impl<T> MockFlownodeManager<T> {
pub fn new(handler: T) -> Self {
Self { handler }
}
}
/// A mock struct implements [Datanode].
#[derive(Clone)]
struct MockDatanode<T> {
struct MockNode<T> {
peer: Peer,
handler: T,
}
#[async_trait::async_trait]
impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handler.handle(&self.peer, request).await
}
@@ -78,7 +108,7 @@ impl<T: MockDatanodeHandler> Datanode for MockDatanode<T> {
#[async_trait::async_trait]
impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
async fn datanode(&self, peer: &Peer) -> DatanodeRef {
Arc::new(MockDatanode {
Arc::new(MockNode {
peer: peer.clone(),
handler: self.handler.clone(),
})
@@ -89,6 +119,31 @@ impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
}
}
#[async_trait::async_trait]
impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
self.handler.handle(&self.peer, request).await
}
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await
}
}
#[async_trait::async_trait]
impl<T: MockFlownodeHandler + 'static> NodeManager for MockFlownodeManager<T> {
async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
unimplemented!()
}
async fn flownode(&self, peer: &Peer) -> FlownodeRef {
Arc::new(MockNode {
peer: peer.clone(),
handler: self.handler.clone(),
})
}
}
/// Returns a test purpose [DdlContext].
pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
let kv_backend = Arc::new(MemoryKvBackend::new());

View File

@@ -19,8 +19,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::{
Context, ContextProvider, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Result,
Status,
Context, ContextProvider, Output, Procedure, ProcedureId, ProcedureState, ProcedureWithId,
Result, Status,
};
/// A Mock [ContextProvider].
@@ -47,7 +47,7 @@ impl ContextProvider for MockContextProvider {
///
/// # Panics
/// Panics if the `procedure` has subprocedure to execute.
pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) {
pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Option<Output> {
let ctx = Context {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
@@ -60,7 +60,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) {
subprocedures.is_empty(),
"Executing subprocedure is unsupported"
),
Status::Done { .. } => break,
Status::Done { output } => return output,
}
}
}

View File

@@ -41,6 +41,7 @@ use common_meta::rpc::store::{
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::ClusterId;
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
use lock::Client as LockClient;
@@ -75,7 +76,7 @@ pub struct MetaClientBuilder {
}
impl MetaClientBuilder {
pub fn new(cluster_id: u64, member_id: u64, role: Role) -> Self {
pub fn new(cluster_id: ClusterId, member_id: u64, role: Role) -> Self {
Self {
id: (cluster_id, member_id),
role,

View File

@@ -15,6 +15,7 @@
use std::collections::HashSet;
use api::v1::meta::HeartbeatRequest;
use common_meta::ClusterId;
use common_time::util as time_util;
use serde::{Deserialize, Serialize};
use store_api::region_engine::RegionRole;
@@ -26,7 +27,7 @@ use crate::keys::StatKey;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Stat {
pub timestamp_millis: i64,
pub cluster_id: u64,
pub cluster_id: ClusterId,
// The datanode Id.
pub id: u64,
// The datanode address.

View File

@@ -14,6 +14,7 @@
use std::str::FromStr;
use common_meta::ClusterId;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -41,7 +42,7 @@ lazy_static! {
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct LeaseKey {
pub cluster_id: u64,
pub cluster_id: ClusterId,
pub node_id: u64,
}
@@ -132,7 +133,7 @@ impl TryFrom<LeaseValue> for Vec<u8> {
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct StatKey {
pub cluster_id: u64,
pub cluster_id: ClusterId,
pub node_id: u64,
}
@@ -237,7 +238,7 @@ impl TryFrom<Vec<u8>> for StatValue {
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct InactiveRegionKey {
pub cluster_id: u64,
pub cluster_id: ClusterId,
pub node_id: u64,
pub region_id: u64,
}

View File

@@ -19,7 +19,7 @@ use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::rpc::router::RegionRoute;
use common_meta::DatanodeId;
use common_meta::{ClusterId, DatanodeId};
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::region_engine::RegionRole;
@@ -167,7 +167,7 @@ impl RegionLeaseKeeper {
/// and corresponding regions will be added to `non_exists` of [RenewRegionLeasesResponse].
pub async fn renew_region_leases(
&self,
_cluster_id: u64,
_cluster_id: ClusterId,
datanode_id: DatanodeId,
regions: &[(RegionId, RegionRole)],
) -> Result<RenewRegionLeasesResponse> {

View File

@@ -608,12 +608,14 @@ pub fn status_to_tonic_code(status_code: StatusCode) -> Code {
StatusCode::Cancelled => Code::Cancelled,
StatusCode::TableAlreadyExists
| StatusCode::TableColumnExists
| StatusCode::RegionAlreadyExists => Code::AlreadyExists,
| StatusCode::RegionAlreadyExists
| StatusCode::FlowAlreadyExists => Code::AlreadyExists,
StatusCode::TableNotFound
| StatusCode::RegionNotFound
| StatusCode::TableColumnNotFound
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound => Code::NotFound,
| StatusCode::UserNotFound
| StatusCode::FlowNotFound => Code::NotFound,
StatusCode::StorageUnavailable | StatusCode::RegionNotReady => Code::Unavailable,
StatusCode::RuntimeResourcesExhausted
| StatusCode::RateLimited

View File

@@ -96,7 +96,9 @@ impl IntoResponse for ErrorResponse {
| StatusCode::DatabaseNotFound
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::PlanQuery => HttpStatusCode::BAD_REQUEST,
| StatusCode::PlanQuery
| StatusCode::FlowNotFound
| StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,
StatusCode::PermissionDenied
| StatusCode::AuthHeaderNotFound