diff --git a/src/catalog/src/information_extension.rs b/src/catalog/src/information_extension.rs index 4d829ae01a..e481d469bd 100644 --- a/src/catalog/src/information_extension.rs +++ b/src/catalog/src/information_extension.rs @@ -16,8 +16,8 @@ use api::v1::meta::ProcedureStatus; use common_error::ext::BoxedError; use common_meta::cluster::{ClusterInfo, NodeInfo}; use common_meta::datanode::RegionStat; -use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::key::flow::flow_state::FlowStat; +use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor}; use common_meta::rpc::procedure; use common_procedure::{ProcedureInfo, ProcedureState}; use meta_client::MetaClientRef; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 282789d626..830fce9f94 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -34,13 +34,14 @@ use common_meta::cluster::{NodeInfo, NodeStatus}; use common_meta::datanode::RegionStat; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; -use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; +use common_meta::procedure_executor::LocalProcedureExecutor; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; @@ -609,9 +610,8 @@ impl StartCommand { flow_metadata_allocator: flow_metadata_allocator.clone(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }; - let procedure_manager_c = procedure_manager.clone(); - let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true) + let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true) .context(error::InitDdlManagerSnafu)?; #[cfg(feature = "enterprise")] let ddl_manager = { @@ -619,7 +619,11 @@ impl StartCommand { plugins.get(); ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager) }; - let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager); + + let procedure_executor = Arc::new(LocalProcedureExecutor::new( + Arc::new(ddl_manager), + procedure_manager.clone(), + )); let fe_instance = FrontendBuilder::new( fe_opts.clone(), @@ -627,7 +631,7 @@ impl StartCommand { layered_cache_registry.clone(), catalog_manager.clone(), node_manager.clone(), - ddl_task_executor.clone(), + procedure_executor.clone(), process_manager, ) .with_plugin(plugins.clone()) @@ -652,7 +656,7 @@ impl StartCommand { catalog_manager.clone(), kv_backend.clone(), layered_cache_registry.clone(), - ddl_task_executor.clone(), + procedure_executor, node_manager, ) .await diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 13f7f08a5f..0fc65419c6 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -15,25 +15,17 @@ use std::collections::HashMap; use std::sync::Arc; -use api::v1::meta::ProcedureDetailResponse; -use common_telemetry::tracing_context::W3cTrace; use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::flow_meta::FlowMetadataAllocatorRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; -use crate::error::{Result, UnsupportedSnafu}; use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::region_registry::LeaderRegionRegistryRef; -use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use crate::rpc::procedure::{ - AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, - RemoveRegionFollowerRequest, -}; use crate::DatanodeId; pub mod alter_database; @@ -59,64 +51,6 @@ pub(crate) mod tests; pub mod truncate_table; pub mod utils; -#[derive(Debug, Default)] -pub struct ExecutorContext { - pub tracing_context: Option, -} - -/// The procedure executor that accepts ddl, region migration task etc. -#[async_trait::async_trait] -pub trait ProcedureExecutor: Send + Sync { - /// Submit a ddl task - async fn submit_ddl_task( - &self, - ctx: &ExecutorContext, - request: SubmitDdlTaskRequest, - ) -> Result; - - /// Add a region follower - async fn add_region_follower( - &self, - _ctx: &ExecutorContext, - _request: AddRegionFollowerRequest, - ) -> Result<()> { - UnsupportedSnafu { - operation: "add_region_follower", - } - .fail() - } - - /// Remove a region follower - async fn remove_region_follower( - &self, - _ctx: &ExecutorContext, - _request: RemoveRegionFollowerRequest, - ) -> Result<()> { - UnsupportedSnafu { - operation: "remove_region_follower", - } - .fail() - } - - /// Submit a region migration task - async fn migrate_region( - &self, - ctx: &ExecutorContext, - request: MigrateRegionRequest, - ) -> Result; - - /// Query the procedure state by its id - async fn query_procedure_state( - &self, - ctx: &ExecutorContext, - pid: &str, - ) -> Result; - - async fn list_procedures(&self, ctx: &ExecutorContext) -> Result; -} - -pub type ProcedureExecutorRef = Arc; - /// Metadata allocated to a table. #[derive(Default)] pub struct TableMetadata { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 30de633c1d..8648338080 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use api::v1::meta::ProcedureDetailResponse; use common_procedure::{ watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, }; @@ -37,16 +36,16 @@ use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::drop_view::DropViewProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; -use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor}; +use crate::ddl::{utils, DdlContext}; use crate::error::{ - EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu, - QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, - TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, - UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu, + EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result, + SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu, + UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu, }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] use crate::rpc::ddl::trigger::CreateTriggerTask; #[cfg(feature = "enterprise")] @@ -61,8 +60,6 @@ use crate::rpc::ddl::{ CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; -use crate::rpc::procedure; -use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; use crate::rpc::router::RegionRoute; pub type DdlManagerRef = Arc; @@ -406,6 +403,70 @@ impl DdlManager { Ok((procedure_id, output)) } + + pub async fn submit_ddl_task( + &self, + ctx: &ExecutorContext, + request: SubmitDdlTaskRequest, + ) -> Result { + let span = ctx + .tracing_context + .as_ref() + .map(TracingContext::from_w3c) + .unwrap_or_else(TracingContext::from_current_span) + .attach(tracing::info_span!("DdlManager::submit_ddl_task")); + async move { + debug!("Submitting Ddl task: {:?}", request.task); + match request.task { + CreateTable(create_table_task) => { + handle_create_table_task(self, create_table_task).await + } + DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await, + AlterTable(alter_table_task) => { + handle_alter_table_task(self, alter_table_task).await + } + TruncateTable(truncate_table_task) => { + handle_truncate_table_task(self, truncate_table_task).await + } + CreateLogicalTables(create_table_tasks) => { + handle_create_logical_table_tasks(self, create_table_tasks).await + } + AlterLogicalTables(alter_table_tasks) => { + handle_alter_logical_table_tasks(self, alter_table_tasks).await + } + DropLogicalTables(_) => todo!(), + CreateDatabase(create_database_task) => { + handle_create_database_task(self, create_database_task).await + } + DropDatabase(drop_database_task) => { + handle_drop_database_task(self, drop_database_task).await + } + AlterDatabase(alter_database_task) => { + handle_alter_database_task(self, alter_database_task).await + } + CreateFlow(create_flow_task) => { + handle_create_flow_task(self, create_flow_task, request.query_context.into()) + .await + } + DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await, + CreateView(create_view_task) => { + handle_create_view_task(self, create_view_task).await + } + DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await, + #[cfg(feature = "enterprise")] + CreateTrigger(create_trigger_task) => { + handle_create_trigger_task( + self, + create_trigger_task, + request.query_context.into(), + ) + .await + } + } + } + .trace(span) + .await + } } async fn handle_truncate_table_task( @@ -712,6 +773,8 @@ async fn handle_create_trigger_task( query_context: QueryContext, ) -> Result { let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else { + use crate::error::UnsupportedSnafu; + return UnsupportedSnafu { operation: "create trigger", } @@ -788,114 +851,6 @@ async fn handle_create_view_task( }) } -/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it. -#[async_trait::async_trait] -impl ProcedureExecutor for DdlManager { - async fn submit_ddl_task( - &self, - ctx: &ExecutorContext, - request: SubmitDdlTaskRequest, - ) -> Result { - let span = ctx - .tracing_context - .as_ref() - .map(TracingContext::from_w3c) - .unwrap_or(TracingContext::from_current_span()) - .attach(tracing::info_span!("DdlManager::submit_ddl_task")); - async move { - debug!("Submitting Ddl task: {:?}", request.task); - match request.task { - CreateTable(create_table_task) => { - handle_create_table_task(self, create_table_task).await - } - DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await, - AlterTable(alter_table_task) => { - handle_alter_table_task(self, alter_table_task).await - } - TruncateTable(truncate_table_task) => { - handle_truncate_table_task(self, truncate_table_task).await - } - CreateLogicalTables(create_table_tasks) => { - handle_create_logical_table_tasks(self, create_table_tasks).await - } - AlterLogicalTables(alter_table_tasks) => { - handle_alter_logical_table_tasks(self, alter_table_tasks).await - } - DropLogicalTables(_) => todo!(), - CreateDatabase(create_database_task) => { - handle_create_database_task(self, create_database_task).await - } - DropDatabase(drop_database_task) => { - handle_drop_database_task(self, drop_database_task).await - } - AlterDatabase(alter_database_task) => { - handle_alter_database_task(self, alter_database_task).await - } - CreateFlow(create_flow_task) => { - handle_create_flow_task(self, create_flow_task, request.query_context.into()) - .await - } - #[cfg(feature = "enterprise")] - CreateTrigger(create_trigger_task) => { - handle_create_trigger_task( - self, - create_trigger_task, - request.query_context.into(), - ) - .await - } - DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await, - CreateView(create_view_task) => { - handle_create_view_task(self, create_view_task).await - } - DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await, - } - } - .trace(span) - .await - } - - async fn migrate_region( - &self, - _ctx: &ExecutorContext, - _request: MigrateRegionRequest, - ) -> Result { - UnsupportedSnafu { - operation: "migrate_region", - } - .fail() - } - - async fn query_procedure_state( - &self, - _ctx: &ExecutorContext, - pid: &str, - ) -> Result { - let pid = - ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?; - - let state = self - .procedure_manager - .procedure_state(pid) - .await - .context(QueryProcedureSnafu)? - .context(ProcedureNotFoundSnafu { - pid: pid.to_string(), - })?; - - Ok(procedure::procedure_state_to_pb_response(&state)) - } - - async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result { - let metas = self - .procedure_manager - .list_procedures() - .await - .context(QueryProcedureSnafu)?; - Ok(procedure::procedure_details_to_pb_response(metas)) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index ac02544f68..fa53ab0a6c 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -37,6 +37,7 @@ pub mod node_expiry_listener; pub mod node_manager; pub mod peer; pub mod poison_key; +pub mod procedure_executor; pub mod range_stream; pub mod reconciliation; pub mod region_keeper; diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs new file mode 100644 index 0000000000..992f0ae8da --- /dev/null +++ b/src/common/meta/src/procedure_executor.rs @@ -0,0 +1,155 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::meta::ProcedureDetailResponse; +use common_procedure::{ProcedureId, ProcedureManagerRef}; +use common_telemetry::tracing_context::W3cTrace; +use snafu::{OptionExt, ResultExt}; + +use crate::ddl_manager::DdlManagerRef; +use crate::error::{ + ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu, +}; +use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; +use crate::rpc::procedure::{ + self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, RemoveRegionFollowerRequest, +}; + +/// The context of procedure executor. +#[derive(Debug, Default)] +pub struct ExecutorContext { + pub tracing_context: Option, +} + +/// The procedure executor that accepts ddl, region migration task etc. +#[async_trait::async_trait] +pub trait ProcedureExecutor: Send + Sync { + /// Submit a ddl task + async fn submit_ddl_task( + &self, + ctx: &ExecutorContext, + request: SubmitDdlTaskRequest, + ) -> Result; + + /// Add a region follower + async fn add_region_follower( + &self, + _ctx: &ExecutorContext, + _request: AddRegionFollowerRequest, + ) -> Result<()> { + UnsupportedSnafu { + operation: "add_region_follower", + } + .fail() + } + + /// Remove a region follower + async fn remove_region_follower( + &self, + _ctx: &ExecutorContext, + _request: RemoveRegionFollowerRequest, + ) -> Result<()> { + UnsupportedSnafu { + operation: "remove_region_follower", + } + .fail() + } + + /// Submit a region migration task + async fn migrate_region( + &self, + ctx: &ExecutorContext, + request: MigrateRegionRequest, + ) -> Result; + + /// Query the procedure state by its id + async fn query_procedure_state( + &self, + ctx: &ExecutorContext, + pid: &str, + ) -> Result; + + async fn list_procedures(&self, ctx: &ExecutorContext) -> Result; +} + +pub type ProcedureExecutorRef = Arc; + +/// The local procedure executor that accepts ddl, region migration task etc. +pub struct LocalProcedureExecutor { + pub ddl_manager: DdlManagerRef, + pub procedure_manager: ProcedureManagerRef, +} + +impl LocalProcedureExecutor { + pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self { + Self { + ddl_manager, + procedure_manager, + } + } +} + +#[async_trait::async_trait] +impl ProcedureExecutor for LocalProcedureExecutor { + async fn submit_ddl_task( + &self, + ctx: &ExecutorContext, + request: SubmitDdlTaskRequest, + ) -> Result { + self.ddl_manager.submit_ddl_task(ctx, request).await + } + + async fn migrate_region( + &self, + _ctx: &ExecutorContext, + _request: MigrateRegionRequest, + ) -> Result { + UnsupportedSnafu { + operation: "migrate_region", + } + .fail() + } + + async fn query_procedure_state( + &self, + _ctx: &ExecutorContext, + pid: &str, + ) -> Result { + let pid = + ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?; + + let state = self + .procedure_manager + .procedure_state(pid) + .await + .context(QueryProcedureSnafu)? + .with_context(|| ProcedureNotFoundSnafu { + pid: pid.to_string(), + })?; + + Ok(procedure::procedure_state_to_pb_response(&state)) + } + + async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result { + let metas = self + .procedure_manager + .list_procedures() + .await + .context(QueryProcedureSnafu)?; + Ok(procedure::procedure_details_to_pb_response(metas)) + } +} diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d065253d9b..6af119381c 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -24,11 +24,11 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef}; -use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{Flownode, NodeManagerRef}; +use common_meta::procedure_executor::ProcedureExecutorRef; use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::tracing::info; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index e9c132da42..113a8ad879 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -20,11 +20,11 @@ use catalog::CatalogManagerRef; use common_base::Plugins; use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef}; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; -use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; +use common_meta::procedure_executor::ProcedureExecutorRef; use operator::delete::Deleter; use operator::flow::FlowServiceOperator; use operator::insert::Inserter; @@ -157,7 +157,8 @@ impl FrontendBuilder { self.catalog_manager.clone(), )); - let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let flow_metadata_manager: Arc = + Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone()); let query_engine = QueryEngineFactory::new_with_plugins( diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 1e4f49077b..257d5a2d9d 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -34,12 +34,12 @@ use common_meta::cluster::{ ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole, }; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; -use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; use common_meta::error::{ self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu, }; use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager}; use common_meta::kv_backend::KvBackendRef; +use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor}; use common_meta::range_stream::PaginationStream; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index dc5da88c46..b0fd25ffcb 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -25,7 +25,7 @@ use common_base::Plugins; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::ddl::ProcedureExecutorRef; +use common_meta::ddl_manager::DdlManagerRef; use common_meta::distributed_time_constants; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::key::TableMetadataManagerRef; @@ -426,7 +426,7 @@ pub struct Metasrv { election: Option, procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, - procedure_executor: ProcedureExecutorRef, + ddl_manager: DdlManagerRef, wal_options_allocator: WalOptionsAllocatorRef, table_metadata_manager: TableMetadataManagerRef, runtime_switch_manager: RuntimeSwitchManagerRef, @@ -677,8 +677,8 @@ impl Metasrv { &self.mailbox } - pub fn procedure_executor(&self) -> &ProcedureExecutorRef { - &self.procedure_executor + pub fn ddl_manager(&self) -> &DdlManagerRef { + &self.ddl_manager } pub fn procedure_manager(&self) -> &ProcedureManagerRef { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 245a5c87d3..547267a7b5 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -461,7 +461,7 @@ impl MetasrvBuilder { election, procedure_manager, mailbox, - procedure_executor: ddl_manager, + ddl_manager, wal_options_allocator, table_metadata_manager, runtime_switch_manager, diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index c388e74e15..ed0612268b 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -21,7 +21,7 @@ use api::v1::meta::{ ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, ResponseHeader, }; -use common_meta::ddl::ExecutorContext; +use common_meta::procedure_executor::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; use common_meta::rpc::procedure; use common_telemetry::warn; @@ -98,7 +98,7 @@ impl procedure_service_server::ProcedureService for Metasrv { .context(error::ConvertProtoDataSnafu)?; let resp = self - .procedure_executor() + .ddl_manager() .submit_ddl_task( &ExecutorContext { tracing_context: Some(header.tracing_context), diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index 87f805acb1..00616cb72a 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; -use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; +use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef}; use common_meta::rpc::procedure::{ AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, RemoveRegionFollowerRequest, diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 698396528f..f9b99578cd 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -36,12 +36,12 @@ use client::RecordBatches; use common_error::ext::BoxedError; use common_meta::cache::TableRouteCacheRef; use common_meta::cache_invalidator::CacheInvalidatorRef; -use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_meta::procedure_executor::ProcedureExecutorRef; use common_query::Output; use common_telemetry::tracing; use common_time::range::TimestampRange; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 904fd45a47..7e9c4c3e05 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -31,10 +31,10 @@ use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; use common_meta::ddl::create_flow::FlowType; -use common_meta::ddl::ExecutorContext; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::NAME_PATTERN; +use common_meta::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] use common_meta::rpc::ddl::trigger::CreateTriggerTask; use common_meta::rpc::ddl::{ diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index e14b08e15e..253f770363 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -34,6 +34,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; +use common_meta::procedure_executor::LocalProcedureExecutor; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; @@ -213,7 +214,7 @@ impl GreptimeDbStandaloneBuilder { flow_id_sequence, )); - let ddl_task_executor = Arc::new( + let ddl_manager = Arc::new( DdlManager::try_new( DdlContext { node_manager: node_manager.clone(), @@ -231,6 +232,10 @@ impl GreptimeDbStandaloneBuilder { ) .unwrap(), ); + let procedure_executor = Arc::new(LocalProcedureExecutor::new( + ddl_manager, + procedure_manager.clone(), + )); let server_addr = opts.frontend_options().grpc.server_addr.clone(); @@ -240,7 +245,7 @@ impl GreptimeDbStandaloneBuilder { cache_registry.clone(), catalog_manager.clone(), node_manager.clone(), - ddl_task_executor.clone(), + procedure_executor.clone(), Arc::new(ProcessManager::new(server_addr, None)), ) .with_plugin(plugins) @@ -263,7 +268,7 @@ impl GreptimeDbStandaloneBuilder { catalog_manager.clone(), kv_backend.clone(), cache_registry.clone(), - ddl_task_executor.clone(), + procedure_executor.clone(), node_manager.clone(), ) .await