feat: introduce RegionFollowerClient trait (#5771)

* chore: expose AskLeader

* feat: introduce `RegionFollowerClient` trait

* feat: build meta client with region follower client
This commit is contained in:
Weny Xu
2025-03-26 16:05:15 +08:00
committed by GitHub
parent 40b52f3b13
commit a31f0e255b
8 changed files with 138 additions and 17 deletions

1
Cargo.lock generated
View File

@@ -6647,6 +6647,7 @@ version = "0.14.0"
dependencies = [
"api",
"async-trait",
"common-base",
"common-error",
"common-grpc",
"common-macro",

View File

@@ -295,10 +295,13 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Datanode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),

View File

@@ -249,10 +249,13 @@ impl StartCommand {
msg: "'meta_client_options'",
})?;
let meta_client =
meta_client::create_meta_client(MetaClientType::Flownode { member_id }, meta_config)
.await
.context(MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Flownode { member_id },
meta_config,
None,
)
.await
.context(MetaClientInitSnafu)?;
let cache_max_capacity = meta_config.metadata_cache_max_capacity;
let cache_ttl = meta_config.metadata_cache_ttl;

View File

@@ -288,10 +288,13 @@ impl StartCommand {
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let meta_client =
meta_client::create_meta_client(MetaClientType::Frontend, meta_client_options)
.await
.context(error::MetaClientInitSnafu)?;
let meta_client = meta_client::create_meta_client(
MetaClientType::Frontend,
meta_client_options,
Some(&plugins),
)
.await
.context(error::MetaClientInitSnafu)?;
// TODO(discord9): add helper function to ease the creation of cache registry&such
let cached_meta_backend =

View File

@@ -22,14 +22,17 @@ use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::error::Result;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use crate::DatanodeId;
pub mod alter_database;
@@ -70,6 +73,30 @@ pub trait ProcedureExecutor: Send + Sync {
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
/// Submit a region migration task
async fn migrate_region(
&self,

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
common-base.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true

View File

@@ -21,9 +21,11 @@ mod cluster;
mod store;
mod util;
use std::fmt::Debug;
use std::sync::Arc;
use api::v1::meta::{ProcedureDetailResponse, Role};
pub use ask_leader::AskLeader;
use cluster::Client as ClusterClient;
pub use cluster::ClusterKvBackend;
use common_error::ext::BoxedError;
@@ -33,13 +35,16 @@ use common_meta::cluster::{
};
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, ExternalSnafu, Result as MetaResult};
use common_meta::error::{
self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -74,6 +79,7 @@ pub struct MetaClientBuilder {
enable_store: bool,
enable_procedure: bool,
enable_access_cluster_info: bool,
region_follower: Option<RegionFollowerClientRef>,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
heartbeat_channel_manager: Option<ChannelManager>,
@@ -162,6 +168,13 @@ impl MetaClientBuilder {
}
}
pub fn with_region_follower(self, region_follower: RegionFollowerClientRef) -> Self {
Self {
region_follower: Some(region_follower),
..self
}
}
pub fn build(self) -> MetaClient {
let mut client = if let Some(mgr) = self.channel_manager {
MetaClient::with_channel_manager(self.id, mgr)
@@ -204,6 +217,10 @@ impl MetaClientBuilder {
))
}
if let Some(region_follower) = self.region_follower {
client.region_follower = Some(region_follower);
}
client
}
}
@@ -216,6 +233,19 @@ pub struct MetaClient {
store: Option<StoreClient>,
procedure: Option<ProcedureClient>,
cluster: Option<ClusterClient>,
region_follower: Option<RegionFollowerClientRef>,
}
pub type RegionFollowerClientRef = Arc<dyn RegionFollowerClient>;
/// A trait for clients that can manage region followers.
#[async_trait::async_trait]
pub trait RegionFollowerClient: Sync + Send + Debug {
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
async fn start(&self, urls: &[&str]) -> Result<()>;
}
#[async_trait::async_trait]
@@ -242,6 +272,44 @@ impl ProcedureExecutor for MetaClient {
.context(meta_error::ExternalSnafu)
}
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
request: AddRegionFollowerRequest,
) -> MetaResult<()> {
if let Some(region_follower) = &self.region_follower {
region_follower
.add_region_follower(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
} else {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
}
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
request: RemoveRegionFollowerRequest,
) -> MetaResult<()> {
if let Some(region_follower) = &self.region_follower {
region_follower
.remove_region_follower(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
} else {
UnsupportedSnafu {
operation: "remove_region_follower",
}
.fail()
}
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
@@ -375,6 +443,11 @@ impl MetaClient {
{
info!("MetaClient channel config: {:?}", self.channel_config());
if let Some(client) = &mut self.region_follower {
let urls = urls.as_ref().iter().map(|u| u.as_ref()).collect::<Vec<_>>();
client.start(&urls).await?;
info!("Region follower client started");
}
if let Some(client) = &mut self.heartbeat {
client.start(urls.clone()).await?;
info!("Heartbeat client started");

View File

@@ -15,8 +15,10 @@
use std::sync::Arc;
use std::time::Duration;
use client::RegionFollowerClientRef;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::info;
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use crate::client::MetaClientBuilder;
@@ -73,6 +75,7 @@ pub type MetaClientRef = Arc<client::MetaClient>;
pub async fn create_meta_client(
client_type: MetaClientType,
meta_client_options: &MetaClientOptions,
plugins: Option<&Plugins>,
) -> error::Result<MetaClientRef> {
info!(
"Creating {:?} instance with Metasrv addrs {:?}",
@@ -98,6 +101,13 @@ pub async fn create_meta_client(
if let MetaClientType::Frontend = client_type {
let ddl_config = base_config.clone().timeout(meta_client_options.ddl_timeout);
builder = builder.ddl_channel_manager(ChannelManager::with_config(ddl_config));
if let Some(plugins) = plugins {
let region_follower = plugins.get::<RegionFollowerClientRef>();
if let Some(region_follower) = region_follower {
debug!("Region follower client found in plugins");
builder = builder.with_region_follower(region_follower);
}
}
}
builder = builder