From a31f0e255b03e0283d44a94f5cbbc6f59bcd42d0 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 26 Mar 2025 16:05:15 +0800 Subject: [PATCH] feat: introduce `RegionFollowerClient` trait (#5771) * chore: expose AskLeader * feat: introduce `RegionFollowerClient` trait * feat: build meta client with region follower client --- Cargo.lock | 1 + src/cmd/src/datanode.rs | 11 +++-- src/cmd/src/flownode.rs | 11 +++-- src/cmd/src/frontend.rs | 11 +++-- src/common/meta/src/ddl.rs | 31 +++++++++++++- src/meta-client/Cargo.toml | 1 + src/meta-client/src/client.rs | 77 ++++++++++++++++++++++++++++++++++- src/meta-client/src/lib.rs | 12 +++++- 8 files changed, 138 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52b93a73f3..71c7f1c6ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6647,6 +6647,7 @@ version = "0.14.0" dependencies = [ "api", "async-trait", + "common-base", "common-error", "common-grpc", "common-macro", diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index daa86326f2..600f2fbf80 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -295,10 +295,13 @@ impl StartCommand { msg: "'meta_client_options'", })?; - let meta_client = - meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config) - .await - .context(MetaClientInitSnafu)?; + let meta_client = meta_client::create_meta_client( + MetaClientType::Datanode { member_id }, + meta_config, + None, + ) + .await + .context(MetaClientInitSnafu)?; let meta_backend = Arc::new(MetaKvBackend { client: meta_client.clone(), diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 6bd02a6a46..fecb4d476d 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -249,10 +249,13 @@ impl StartCommand { msg: "'meta_client_options'", })?; - let meta_client = - meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config) - .await - .context(MetaClientInitSnafu)?; + let meta_client = meta_client::create_meta_client( + MetaClientType::Flownode { member_id }, + meta_config, + None, + ) + .await + .context(MetaClientInitSnafu)?; let cache_max_capacity = meta_config.metadata_cache_max_capacity; let cache_ttl = meta_config.metadata_cache_ttl; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 5c99bcc779..5b10afa051 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -288,10 +288,13 @@ impl StartCommand { let cache_ttl = meta_client_options.metadata_cache_ttl; let cache_tti = meta_client_options.metadata_cache_tti; - let meta_client = - meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options) - .await - .context(error::MetaClientInitSnafu)?; + let meta_client = meta_client::create_meta_client( + MetaClientType::Frontend, + meta_client_options, + Some(&plugins), + ) + .await + .context(error::MetaClientInitSnafu)?; // TODO(discord9): add helper function to ease the creation of cache registry&such let cached_meta_backend = diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 55a9a64c84..157db9c9ad 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -22,14 +22,17 @@ use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::flow_meta::FlowMetadataAllocatorRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; -use crate::error::Result; +use crate::error::{Result, UnsupportedSnafu}; use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; -use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; +use crate::rpc::procedure::{ + AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + RemoveRegionFollowerRequest, +}; use crate::DatanodeId; pub mod alter_database; @@ -70,6 +73,30 @@ pub trait ProcedureExecutor: Send + Sync { request: SubmitDdlTaskRequest, ) -> Result; + /// Add a region follower + async fn add_region_follower( + &self, + _ctx: &ExecutorContext, + _request: AddRegionFollowerRequest, + ) -> Result<()> { + UnsupportedSnafu { + operation: "add_region_follower", + } + .fail() + } + + /// Remove a region follower + async fn remove_region_follower( + &self, + _ctx: &ExecutorContext, + _request: RemoveRegionFollowerRequest, + ) -> Result<()> { + UnsupportedSnafu { + operation: "remove_region_follower", + } + .fail() + } + /// Submit a region migration task async fn migrate_region( &self, diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index cf9c256e5c..70a3fb581a 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] api.workspace = true async-trait.workspace = true +common-base.workspace = true common-error.workspace = true common-grpc.workspace = true common-macro.workspace = true diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index b0215d9e26..6a1e5f3335 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -21,9 +21,11 @@ mod cluster; mod store; mod util; +use std::fmt::Debug; use std::sync::Arc; use api::v1::meta::{ProcedureDetailResponse, Role}; +pub use ask_leader::AskLeader; use cluster::Client as ClusterClient; pub use cluster::ClusterKvBackend; use common_error::ext::BoxedError; @@ -33,13 +35,16 @@ use common_meta::cluster::{ }; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat}; use common_meta::ddl::{ExecutorContext, ProcedureExecutor}; -use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult}; +use common_meta::error::{ + self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu, +}; use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager}; use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::PaginationStream; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ - MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + RemoveRegionFollowerRequest, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -74,6 +79,7 @@ pub struct MetaClientBuilder { enable_store: bool, enable_procedure: bool, enable_access_cluster_info: bool, + region_follower: Option, channel_manager: Option, ddl_channel_manager: Option, heartbeat_channel_manager: Option, @@ -162,6 +168,13 @@ impl MetaClientBuilder { } } + pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self { + Self { + region_follower: Some(region_follower), + ..self + } + } + pub fn build(self) -> MetaClient { let mut client = if let Some(mgr) = self.channel_manager { MetaClient::with_channel_manager(self.id, mgr) @@ -204,6 +217,10 @@ impl MetaClientBuilder { )) } + if let Some(region_follower) = self.region_follower { + client.region_follower = Some(region_follower); + } + client } } @@ -216,6 +233,19 @@ pub struct MetaClient { store: Option, procedure: Option, cluster: Option, + region_follower: Option, +} + +pub type RegionFollowerClientRef = Arc; + +/// A trait for clients that can manage region followers. +#[async_trait::async_trait] +pub trait RegionFollowerClient: Sync + Send + Debug { + async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>; + + async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>; + + async fn start(&self, urls: &[&str]) -> Result<()>; } #[async_trait::async_trait] @@ -242,6 +272,44 @@ impl ProcedureExecutor for MetaClient { .context(meta_error::ExternalSnafu) } + async fn add_region_follower( + &self, + _ctx: &ExecutorContext, + request: AddRegionFollowerRequest, + ) -> MetaResult<()> { + if let Some(region_follower) = &self.region_follower { + region_follower + .add_region_follower(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } else { + UnsupportedSnafu { + operation: "add_region_follower", + } + .fail() + } + } + + async fn remove_region_follower( + &self, + _ctx: &ExecutorContext, + request: RemoveRegionFollowerRequest, + ) -> MetaResult<()> { + if let Some(region_follower) = &self.region_follower { + region_follower + .remove_region_follower(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } else { + UnsupportedSnafu { + operation: "remove_region_follower", + } + .fail() + } + } + async fn query_procedure_state( &self, _ctx: &ExecutorContext, @@ -375,6 +443,11 @@ impl MetaClient { { info!("MetaClient channel config: {:?}", self.channel_config()); + if let Some(client) = &mut self.region_follower { + let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::>(); + client.start(&urls).await?; + info!("Region follower client started"); + } if let Some(client) = &mut self.heartbeat { client.start(urls.clone()).await?; info!("Heartbeat client started"); diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 87eab997bc..36e94c6654 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -15,8 +15,10 @@ use std::sync::Arc; use std::time::Duration; +use client::RegionFollowerClientRef; +use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_telemetry::info; +use common_telemetry::{debug, info}; use serde::{Deserialize, Serialize}; use crate::client::MetaClientBuilder; @@ -73,6 +75,7 @@ pub type MetaClientRef = Arc; pub async fn create_meta_client( client_type: MetaClientType, meta_client_options: &MetaClientOptions, + plugins: Option<&Plugins>, ) -> error::Result { info!( "Creating {:?} instance with Metasrv addrs {:?}", @@ -98,6 +101,13 @@ pub async fn create_meta_client( if let MetaClientType::Frontend = client_type { let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout); builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config)); + if let Some(plugins) = plugins { + let region_follower = plugins.get::(); + if let Some(region_follower) = region_follower { + debug!("Region follower client found in plugins"); + builder = builder.with_region_follower(region_follower); + } + } } builder = builder