feat: add catalog_manager to ProcedureServiceHandler (#5873)

This commit is contained in:
Weny Xu
2025-04-10 14:55:46 +08:00
committed by GitHub
parent e052c65a58
commit 54ef29f394
6 changed files with 26 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -2014,6 +2014,7 @@ dependencies = [
"arc-swap",
"async-trait",
"bincode",
"catalog",
"chrono",
"common-base",
"common-catalog",

View File

@@ -18,6 +18,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
bincode = "1.3"
catalog.workspace = true
chrono.workspace = true
common-base.workspace = true
common-catalog.workspace = true

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
@@ -72,6 +73,9 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Remove a region follower from a region.
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
/// Get the catalog manager
fn catalog_manager(&self) -> &CatalogManagerRef;
}
/// This flow service handler is only use for flush flow for now.

View File

@@ -34,6 +34,7 @@ impl FunctionState {
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
@@ -80,6 +81,10 @@ impl FunctionState {
) -> Result<()> {
Ok(())
}
fn catalog_manager(&self) -> &CatalogManagerRef {
unimplemented!()
}
}
#[async_trait]

View File

@@ -152,6 +152,7 @@ impl FrontendBuilder {
let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
self.procedure_executor.clone(),
self.catalog_manager.clone(),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
@@ -28,11 +29,18 @@ use snafu::ResultExt;
#[derive(Clone)]
pub struct ProcedureServiceOperator {
procedure_executor: ProcedureExecutorRef,
catalog_manager: CatalogManagerRef,
}
impl ProcedureServiceOperator {
pub fn new(procedure_executor: ProcedureExecutorRef) -> Self {
Self { procedure_executor }
pub fn new(
procedure_executor: ProcedureExecutorRef,
catalog_manager: CatalogManagerRef,
) -> Self {
Self {
procedure_executor,
catalog_manager,
}
}
}
@@ -75,4 +83,8 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
}