From 54ef29f3949f2ceed2bd8f401498a18c0ca9fe1e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 10 Apr 2025 14:55:46 +0800 Subject: [PATCH] feat: add `catalog_manager` to `ProcedureServiceHandler` (#5873) --- Cargo.lock | 1 + src/common/function/Cargo.toml | 1 + src/common/function/src/handlers.rs | 4 ++++ src/common/function/src/state.rs | 5 +++++ src/frontend/src/instance/builder.rs | 1 + src/operator/src/procedure.rs | 16 ++++++++++++++-- 6 files changed, 26 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b19e1bb75f..cba1fa8793 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2014,6 +2014,7 @@ dependencies = [ "arc-swap", "async-trait", "bincode", + "catalog", "chrono", "common-base", "common-catalog", diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 7a4c968a3e..73821a896a 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -18,6 +18,7 @@ api.workspace = true arc-swap = "1.0" async-trait.workspace = true bincode = "1.3" +catalog.workspace = true chrono.workspace = true common-base.workspace = true common-catalog.workspace = true diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 1d994731d5..bcb6ce5460 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; +use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, @@ -72,6 +73,9 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Remove a region follower from a region. async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>; + + /// Get the catalog manager + fn catalog_manager(&self) -> &CatalogManagerRef; } /// This flow service handler is only use for flush flow for now. diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 66f5463fa2..211f7e1438 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -34,6 +34,7 @@ impl FunctionState { use api::v1::meta::ProcedureStatus; use async_trait::async_trait; + use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, @@ -80,6 +81,10 @@ impl FunctionState { ) -> Result<()> { Ok(()) } + + fn catalog_manager(&self) -> &CatalogManagerRef { + unimplemented!() + } } #[async_trait] diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 52b2463503..8503999b2c 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -152,6 +152,7 @@ impl FrontendBuilder { let procedure_service_handler = Arc::new(ProcedureServiceOperator::new( self.procedure_executor.clone(), + self.catalog_manager.clone(), )); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index e2c27c024f..87f805acb1 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; @@ -28,11 +29,18 @@ use snafu::ResultExt; #[derive(Clone)] pub struct ProcedureServiceOperator { procedure_executor: ProcedureExecutorRef, + catalog_manager: CatalogManagerRef, } impl ProcedureServiceOperator { - pub fn new(procedure_executor: ProcedureExecutorRef) -> Self { - Self { procedure_executor } + pub fn new( + procedure_executor: ProcedureExecutorRef, + catalog_manager: CatalogManagerRef, + ) -> Self { + Self { + procedure_executor, + catalog_manager, + } } } @@ -75,4 +83,8 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { .map_err(BoxedError::new) .context(query_error::ProcedureServiceSnafu) } + + fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager + } }