refactor: remove procedure executor from DDL manager (#6625)

* refactor: remove procedure executor from DDL manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from  CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-01 17:33:47 +08:00
committed by GitHub
parent 9e2f793b04
commit 19ad9a7f85
17 changed files with 269 additions and 212 deletions

View File

@@ -16,8 +16,8 @@ use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::datanode::RegionStat;
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use meta_client::MetaClientRef;

View File

@@ -34,13 +34,14 @@ use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::datanode::RegionStat;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
@@ -640,9 +641,8 @@ impl StartCommand {
flow_metadata_allocator: flow_metadata_allocator.clone(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
};
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let ddl_manager = {
@@ -650,7 +650,11 @@ impl StartCommand {
plugins.get();
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
};
let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager);
let procedure_executor = Arc::new(LocalProcedureExecutor::new(
Arc::new(ddl_manager),
procedure_manager.clone(),
));
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
@@ -658,7 +662,7 @@ impl StartCommand {
layered_cache_registry.clone(),
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
procedure_executor.clone(),
process_manager,
)
.with_plugin(plugins.clone())
@@ -683,7 +687,7 @@ impl StartCommand {
catalog_manager.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
ddl_task_executor.clone(),
procedure_executor,
node_manager,
)
.await

View File

@@ -15,25 +15,17 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use crate::DatanodeId;
pub mod alter_database;
@@ -59,64 +51,6 @@ pub(crate) mod tests;
pub mod truncate_table;
pub mod utils;
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub tracing_context: Option<W3cTrace>,
}
/// The procedure executor that accepts ddl, region migration task etc.
#[async_trait::async_trait]
pub trait ProcedureExecutor: Send + Sync {
/// Submit a ddl task
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
/// Submit a region migration task
async fn migrate_region(
&self,
ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Query the procedure state by its id
async fn query_procedure_state(
&self,
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
}
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
/// Metadata allocated to a table.
#[derive(Default)]
pub struct TableMetadata {

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_procedure::{
watcher, BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId,
};
@@ -37,16 +36,16 @@ use crate::ddl::drop_flow::DropFlowProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::drop_view::DropViewProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::ddl::{utils, DdlContext};
use crate::error::{
EmptyDdlTasksSnafu, ParseProcedureIdSnafu, ProcedureNotFoundSnafu, ProcedureOutputSnafu,
QueryProcedureSnafu, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu,
TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, UnsupportedSnafu, WaitProcedureSnafu,
EmptyDdlTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableInfoNotFoundSnafu, TableNotFoundSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
@@ -65,8 +64,6 @@ use crate::rpc::ddl::{
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::router::RegionRoute;
pub type DdlManagerRef = Arc<DdlManager>;
@@ -418,6 +415,75 @@ impl DdlManager {
Ok((procedure_id, output))
}
pub async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let span = ctx
.tracing_context
.as_ref()
.map(TracingContext::from_w3c)
.unwrap_or_else(TracingContext::from_current_span)
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, create_table_task).await
}
DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
AlterTable(alter_table_task) => {
handle_alter_table_task(self, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, truncate_table_task).await
}
CreateLogicalTables(create_table_tasks) => {
handle_create_logical_table_tasks(self, create_table_tasks).await
}
AlterLogicalTables(alter_table_tasks) => {
handle_alter_logical_table_tasks(self, alter_table_tasks).await
}
DropLogicalTables(_) => todo!(),
CreateDatabase(create_database_task) => {
handle_create_database_task(self, create_database_task).await
}
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, drop_database_task).await
}
AlterDatabase(alter_database_task) => {
handle_alter_database_task(self, alter_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
self,
create_trigger_task,
request.query_context.into(),
)
.await
}
#[cfg(feature = "enterprise")]
DropTrigger(drop_trigger_task) => {
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
.await
}
}
}
.trace(span)
.await
}
}
async fn handle_truncate_table_task(
@@ -667,6 +733,8 @@ async fn handle_drop_trigger_task(
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
use crate::error::UnsupportedSnafu;
return UnsupportedSnafu {
operation: "drop trigger",
}
@@ -746,6 +814,8 @@ async fn handle_create_trigger_task(
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
use crate::error::UnsupportedSnafu;
return UnsupportedSnafu {
operation: "create trigger",
}
@@ -822,119 +892,6 @@ async fn handle_create_view_task(
})
}
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
#[async_trait::async_trait]
impl ProcedureExecutor for DdlManager {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let span = ctx
.tracing_context
.as_ref()
.map(TracingContext::from_w3c)
.unwrap_or(TracingContext::from_current_span())
.attach(tracing::info_span!("DdlManager::submit_ddl_task"));
async move {
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, create_table_task).await
}
DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
AlterTable(alter_table_task) => {
handle_alter_table_task(self, alter_table_task).await
}
TruncateTable(truncate_table_task) => {
handle_truncate_table_task(self, truncate_table_task).await
}
CreateLogicalTables(create_table_tasks) => {
handle_create_logical_table_tasks(self, create_table_tasks).await
}
AlterLogicalTables(alter_table_tasks) => {
handle_alter_logical_table_tasks(self, alter_table_tasks).await
}
DropLogicalTables(_) => todo!(),
CreateDatabase(create_database_task) => {
handle_create_database_task(self, create_database_task).await
}
DropDatabase(drop_database_task) => {
handle_drop_database_task(self, drop_database_task).await
}
AlterDatabase(alter_database_task) => {
handle_alter_database_task(self, alter_database_task).await
}
CreateFlow(create_flow_task) => {
handle_create_flow_task(self, create_flow_task, request.query_context.into())
.await
}
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
CreateView(create_view_task) => {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
self,
create_trigger_task,
request.query_context.into(),
)
.await
}
#[cfg(feature = "enterprise")]
DropTrigger(drop_trigger_task) => {
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
.await
}
}
}
.trace(span)
.await
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
_request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse> {
UnsupportedSnafu {
operation: "migrate_region",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse> {
let pid =
ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
let state = self
.procedure_manager
.procedure_state(pid)
.await
.context(QueryProcedureSnafu)?
.context(ProcedureNotFoundSnafu {
pid: pid.to_string(),
})?;
Ok(procedure::procedure_state_to_pb_response(&state))
}
async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
let metas = self
.procedure_manager
.list_procedures()
.await
.context(QueryProcedureSnafu)?;
Ok(procedure::procedure_details_to_pb_response(metas))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -37,6 +37,7 @@ pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod poison_key;
pub mod procedure_executor;
pub mod range_stream;
pub mod reconciliation;
pub mod region_keeper;

View File

@@ -0,0 +1,155 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use common_procedure::{ProcedureId, ProcedureManagerRef};
use common_telemetry::tracing_context::W3cTrace;
use snafu::{OptionExt, ResultExt};
use crate::ddl_manager::DdlManagerRef;
use crate::error::{
ParseProcedureIdSnafu, ProcedureNotFoundSnafu, QueryProcedureSnafu, Result, UnsupportedSnafu,
};
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse, RemoveRegionFollowerRequest,
};
/// The context of procedure executor.
#[derive(Debug, Default)]
pub struct ExecutorContext {
pub tracing_context: Option<W3cTrace>,
}
/// The procedure executor that accepts ddl, region migration task etc.
#[async_trait::async_trait]
pub trait ProcedureExecutor: Send + Sync {
/// Submit a ddl task
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
/// Submit a region migration task
async fn migrate_region(
&self,
ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Query the procedure state by its id
async fn query_procedure_state(
&self,
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
async fn list_procedures(&self, ctx: &ExecutorContext) -> Result<ProcedureDetailResponse>;
}
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
/// The local procedure executor that accepts ddl, region migration task etc.
pub struct LocalProcedureExecutor {
pub ddl_manager: DdlManagerRef,
pub procedure_manager: ProcedureManagerRef,
}
impl LocalProcedureExecutor {
pub fn new(ddl_manager: DdlManagerRef, procedure_manager: ProcedureManagerRef) -> Self {
Self {
ddl_manager,
procedure_manager,
}
}
}
#[async_trait::async_trait]
impl ProcedureExecutor for LocalProcedureExecutor {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
self.ddl_manager.submit_ddl_task(ctx, request).await
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
_request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse> {
UnsupportedSnafu {
operation: "migrate_region",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse> {
let pid =
ProcedureId::parse_str(pid).with_context(|_| ParseProcedureIdSnafu { key: pid })?;
let state = self
.procedure_manager
.procedure_state(pid)
.await
.context(QueryProcedureSnafu)?
.with_context(|| ProcedureNotFoundSnafu {
pid: pid.to_string(),
})?;
Ok(procedure::procedure_state_to_pb_response(&state))
}
async fn list_procedures(&self, _ctx: &ExecutorContext) -> Result<ProcedureDetailResponse> {
let metas = self
.procedure_manager
.list_procedures()
.await
.context(QueryProcedureSnafu)?;
Ok(procedure::procedure_details_to_pb_response(metas))
}
}

View File

@@ -24,11 +24,11 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::tracing::info;

View File

@@ -40,12 +40,12 @@ use common_base::Plugins;
use common_config::KvBackendConfig;
use common_error::ext::{BoxedError, ErrorExt};
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;

View File

@@ -20,11 +20,11 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_meta::cache::{LayeredCacheRegistryRef, TableRouteCacheRef};
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use dashmap::DashMap;
use operator::delete::Deleter;
use operator::flow::FlowServiceOperator;
@@ -158,7 +158,8 @@ impl FrontendBuilder {
self.catalog_manager.clone(),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_metadata_manager: Arc<FlowMetadataManager> =
Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_service = FlowServiceOperator::new(flow_metadata_manager, node_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(

View File

@@ -34,12 +34,12 @@ use common_meta::cluster::{
ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
};
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{
self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{

View File

@@ -25,7 +25,7 @@ use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
@@ -436,7 +436,7 @@ pub struct Metasrv {
election: Option<ElectionRef>,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
procedure_executor: ProcedureExecutorRef,
ddl_manager: DdlManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
runtime_switch_manager: RuntimeSwitchManagerRef,
@@ -687,8 +687,8 @@ impl Metasrv {
&self.mailbox
}
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
&self.procedure_executor
pub fn ddl_manager(&self) -> &DdlManagerRef {
&self.ddl_manager
}
pub fn procedure_manager(&self) -> &ProcedureManagerRef {

View File

@@ -464,7 +464,7 @@ impl MetasrvBuilder {
election,
procedure_manager,
mailbox,
procedure_executor: ddl_manager,
ddl_manager,
wal_options_allocator,
table_metadata_manager,
runtime_switch_manager,

View File

@@ -21,7 +21,7 @@ use api::v1::meta::{
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
ResponseHeader,
};
use common_meta::ddl::ExecutorContext;
use common_meta::procedure_executor::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
use common_meta::rpc::procedure;
use common_telemetry::warn;
@@ -100,7 +100,7 @@ impl procedure_service_server::ProcedureService for Metasrv {
.context(error::ConvertProtoDataSnafu)?;
let resp = self
.procedure_executor()
.ddl_manager()
.submit_ddl_task(
&ExecutorContext {
tracing_context: Some(header.tracing_context),

View File

@@ -16,7 +16,7 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,

View File

@@ -36,12 +36,12 @@ use client::RecordBatches;
use common_error::ext::BoxedError;
use common_meta::cache::TableRouteCacheRef;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::ProcedureExecutorRef;
use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;

View File

@@ -31,10 +31,10 @@ use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::create_flow::FlowType;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::key::NAME_PATTERN;
use common_meta::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use common_meta::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]

View File

@@ -34,6 +34,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
@@ -213,7 +214,7 @@ impl GreptimeDbStandaloneBuilder {
flow_id_sequence,
));
let ddl_task_executor = Arc::new(
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
node_manager: node_manager.clone(),
@@ -231,6 +232,10 @@ impl GreptimeDbStandaloneBuilder {
)
.unwrap(),
);
let procedure_executor = Arc::new(LocalProcedureExecutor::new(
ddl_manager,
procedure_manager.clone(),
));
let server_addr = opts.frontend_options().grpc.server_addr.clone();
@@ -240,7 +245,7 @@ impl GreptimeDbStandaloneBuilder {
cache_registry.clone(),
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
procedure_executor.clone(),
Arc::new(ProcessManager::new(server_addr, None)),
)
.with_plugin(plugins)
@@ -263,7 +268,7 @@ impl GreptimeDbStandaloneBuilder {
catalog_manager.clone(),
kv_backend.clone(),
cache_registry.clone(),
ddl_task_executor.clone(),
procedure_executor.clone(),
node_manager.clone(),
)
.await