refactor: make finding leader in metasrv client dynamic (#6343)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-06-18 19:29:23 +08:00
committed by GitHub
parent 4b04c402b6
commit 6da8e00243
15 changed files with 199 additions and 108 deletions

View File

@@ -93,6 +93,7 @@ impl InstanceBuilder {
MetaClientType::Datanode { member_id },
meta_client_options,
Some(&plugins),
None,
)
.await
.context(MetaClientInitSnafu)?;

View File

@@ -283,6 +283,7 @@ impl StartCommand {
MetaClientType::Flownode { member_id },
meta_config,
None,
None,
)
.await
.context(MetaClientInitSnafu)?;

View File

@@ -313,6 +313,7 @@ impl StartCommand {
MetaClientType::Frontend,
meta_client_options,
Some(&plugins),
None,
)
.await
.context(error::MetaClientInitSnafu)?;

View File

@@ -201,8 +201,8 @@ impl ChannelManager {
"http"
};
let mut endpoint =
Endpoint::new(format!("{http_prefix}://{addr}")).context(CreateChannelSnafu)?;
let mut endpoint = Endpoint::new(format!("{http_prefix}://{addr}"))
.context(CreateChannelSnafu { addr })?;
if let Some(dur) = self.config().timeout {
endpoint = endpoint.timeout(dur);
@@ -237,7 +237,7 @@ impl ChannelManager {
if let Some(tls_config) = &self.inner.client_tls_config {
endpoint = endpoint
.tls_config(tls_config.clone())
.context(CreateChannelSnafu)?;
.context(CreateChannelSnafu { addr })?;
}
endpoint = endpoint

View File

@@ -52,8 +52,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create gRPC channel"))]
#[snafu(display("Failed to create gRPC channel from '{addr}'"))]
CreateChannel {
addr: String,
#[snafu(source)]
error: tonic::transport::Error,
#[snafu(implicit)]

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use common_config::Configurable;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::GrpcServer;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
@@ -92,13 +92,7 @@ impl<'a> DatanodeServiceBuilder<'a> {
opts: &DatanodeOptions,
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
};
GrpcServerBuilder::new(config, region_server.runtime())
GrpcServerBuilder::new(opts.grpc.as_config(), region_server.runtime())
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
}

View File

@@ -22,7 +22,7 @@ use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::grpc::{GrpcOptions, GrpcServer};
use servers::http::event::LogValidatorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::interceptor::LogIngestInterceptorRef;
@@ -66,12 +66,7 @@ where
}
pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
tls: opts.tls.clone(),
};
let builder = GrpcServerBuilder::new(grpc_config, common_runtime::global_runtime())
let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
Ok(builder)

View File

@@ -25,7 +25,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use api::v1::meta::{ProcedureDetailResponse, Role};
pub use ask_leader::AskLeader;
pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
use cluster::Client as ClusterClient;
pub use cluster::ClusterKvBackend;
use common_error::ext::BoxedError;
@@ -247,6 +247,8 @@ pub trait RegionFollowerClient: Sync + Send + Debug {
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
async fn start(&self, urls: &[&str]) -> Result<()>;
async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
}
#[async_trait::async_trait]
@@ -469,6 +471,43 @@ impl MetaClient {
Ok(())
}
/// Start the client with a [LeaderProvider] and other Metasrv peers' addresses.
pub(crate) async fn start_with<U, A>(
&mut self,
leader_provider: LeaderProviderRef,
peers: A,
) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]> + Clone,
{
if let Some(client) = &self.region_follower {
info!("Starting region follower client ...");
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &self.heartbeat {
info!("Starting heartbeat client ...");
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &mut self.store {
info!("Starting store client ...");
client.start(peers.clone()).await?;
}
if let Some(client) = &self.procedure {
info!("Starting procedure client ...");
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &mut self.cluster {
info!("Starting cluster client ...");
client.start_with(leader_provider).await?;
}
Ok(())
}
/// Ask the leader address of `metasrv`, and the heartbeat component
/// needs to create a bidirectional streaming to the leader.
pub async fn ask_leader(&self) -> Result<String> {

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
use async_trait::async_trait;
use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
@@ -30,6 +32,19 @@ use crate::client::Id;
use crate::error;
use crate::error::Result;
pub type LeaderProviderRef = Arc<dyn LeaderProvider>;
/// Provide [MetaClient] a Metasrv leader's address.
#[async_trait]
pub trait LeaderProvider: Debug + Send + Sync {
/// Get the leader of the Metasrv. If it returns `None`, or the leader is outdated,
/// you can use `ask_leader` to find a new one.
fn leader(&self) -> Option<String>;
/// Find the current leader of the Metasrv.
async fn ask_leader(&self) -> Result<String>;
}
#[derive(Debug)]
struct LeadershipGroup {
leader: Option<String>,
@@ -155,3 +170,14 @@ impl AskLeader {
))
}
}
#[async_trait]
impl LeaderProvider for AskLeader {
fn leader(&self) -> Option<String> {
self.get_leader()
}
async fn ask_leader(&self) -> Result<String> {
self.ask_leader().await
}
}

View File

@@ -38,7 +38,7 @@ use tonic::transport::Channel;
use tonic::Status;
use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
use crate::client::{util, Id, LeaderProviderRef};
use crate::error::{
ConvertMetaResponseSnafu, CreateChannelSnafu, Error, IllegalGrpcClientStateSnafu,
ReadOnlyKvBackendSnafu, Result, RetryTimesExceededSnafu,
@@ -55,7 +55,7 @@ impl Client {
id,
role,
channel_manager,
ask_leader: None,
leader_provider: None,
max_retry,
}));
@@ -68,7 +68,13 @@ impl Client {
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
inner.start_with(leader_provider)
}
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
@@ -144,37 +150,40 @@ struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
ensure!(
!self.is_started(),
IllegalGrpcClientStateSnafu {
err_msg: "Cluster client already started",
}
);
self.leader_provider = Some(leader_provider);
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
);
self.start_with(Arc::new(ask_leader))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ClusterClient<Channel>> {
@@ -188,18 +197,7 @@ impl Inner {
#[inline]
fn is_started(&self) -> bool {
self.ask_leader.is_some()
}
fn ask_leader(&self) -> Result<&AskLeader> {
ensure!(
self.is_started(),
IllegalGrpcClientStateSnafu {
err_msg: "Cluster client not start"
}
);
Ok(self.ask_leader.as_ref().unwrap())
self.leader_provider.is_some()
}
async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
@@ -208,19 +206,25 @@ impl Inner {
F: Fn(ClusterClient<Channel>) -> R,
H: Fn(&T) -> &Option<ResponseHeader>,
{
let ask_leader = self.ask_leader()?;
let Some(leader_provider) = self.leader_provider.as_ref() else {
return IllegalGrpcClientStateSnafu {
err_msg: "not started",
}
.fail();
};
let mut times = 0;
let mut last_error = None;
while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
if let Some(leader) = &leader_provider.leader() {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if util::is_not_leader(get_header(&res)) {
last_error = Some(format!("{leader} is not a leader"));
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
let leader = leader_provider.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
times += 1;
continue;
@@ -232,7 +236,7 @@ impl Inner {
if util::is_unreachable(&status) {
last_error = Some(status.to_string());
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
let leader = leader_provider.ask_leader().await?;
info!("Cluster client updated to new leader addr: {leader}");
times += 1;
continue;
@@ -242,7 +246,7 @@ impl Inner {
}
}
}
} else if let Err(err) = ask_leader.ask_leader().await {
} else if let Err(err) = leader_provider.ask_leader().await {
return Err(err);
}
}

View File

@@ -28,7 +28,7 @@ use tonic::transport::Channel;
use tonic::Streaming;
use crate::client::ask_leader::AskLeader;
use crate::client::Id;
use crate::client::{Id, LeaderProviderRef};
use crate::error;
use crate::error::{InvalidResponseHeaderSnafu, Result};
@@ -116,7 +116,13 @@ impl Client {
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
inner.start_with(leader_provider)
}
pub async fn ask_leader(&mut self) -> Result<String> {
@@ -136,7 +142,7 @@ struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
}
@@ -146,48 +152,50 @@ impl Inner {
id,
role,
channel_manager,
ask_leader: None,
leader_provider: None,
max_retry,
}
}
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Heartbeat client already started"
}
);
self.leader_provider = Some(leader_provider);
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
);
self.start_with(Arc::new(ask_leader))
}
async fn ask_leader(&self) -> Result<String> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Heartbeat client not start"
let Some(leader_provider) = self.leader_provider.as_ref() else {
return error::IllegalGrpcClientStateSnafu {
err_msg: "not started",
}
);
self.ask_leader.as_ref().unwrap().ask_leader().await
.fail();
};
leader_provider.ask_leader().await
}
async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
@@ -199,10 +207,10 @@ impl Inner {
);
let leader_addr = self
.ask_leader
.leader_provider
.as_ref()
.unwrap()
.get_leader()
.leader()
.context(error::NoLeaderSnafu)?;
let mut leader = self.make_client(&leader_addr)?;
@@ -262,7 +270,7 @@ impl Inner {
#[inline]
pub(crate) fn is_started(&self) -> bool {
self.ask_leader.is_some()
self.leader_provider.is_some()
}
}

View File

@@ -32,7 +32,7 @@ use tonic::transport::Channel;
use tonic::Status;
use crate::client::ask_leader::AskLeader;
use crate::client::{util, Id};
use crate::client::{util, Id, LeaderProviderRef};
use crate::error;
use crate::error::Result;
@@ -47,7 +47,7 @@ impl Client {
id,
role,
channel_manager,
ask_leader: None,
leader_provider: None,
max_retry,
}));
@@ -60,7 +60,13 @@ impl Client {
A: AsRef<[U]>,
{
let mut inner = self.inner.write().await;
inner.start(urls).await
inner.start(urls)
}
/// Start the client with a [LeaderProvider].
pub(crate) async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()> {
let mut inner = self.inner.write().await;
inner.start_with(leader_provider)
}
pub async fn submit_ddl_task(&self, req: DdlTaskRequest) -> Result<DdlTaskResponse> {
@@ -103,37 +109,40 @@ struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
leader_provider: Option<LeaderProviderRef>,
max_retry: usize,
}
impl Inner {
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
fn start_with(&mut self, leader_provider: LeaderProviderRef) -> Result<()> {
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "DDL client already started",
}
);
self.leader_provider = Some(leader_provider);
Ok(())
}
fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
let ask_leader = AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
);
self.start_with(Arc::new(ask_leader))
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<ProcedureServiceClient<Channel>> {
@@ -150,18 +159,7 @@ impl Inner {
#[inline]
fn is_started(&self) -> bool {
self.ask_leader.is_some()
}
fn ask_leader(&self) -> Result<&AskLeader> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "DDL client not start"
}
);
Ok(self.ask_leader.as_ref().unwrap())
self.leader_provider.is_some()
}
async fn with_retry<T, F, R, H>(&self, task: &str, body_fn: F, get_header: H) -> Result<T>
@@ -170,19 +168,25 @@ impl Inner {
F: Fn(ProcedureServiceClient<Channel>) -> R,
H: Fn(&T) -> &Option<ResponseHeader>,
{
let ask_leader = self.ask_leader()?;
let Some(leader_provider) = self.leader_provider.as_ref() else {
return error::IllegalGrpcClientStateSnafu {
err_msg: "not started",
}
.fail();
};
let mut times = 0;
let mut last_error = None;
while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
if let Some(leader) = &leader_provider.leader() {
let client = self.make_client(leader)?;
match body_fn(client).await {
Ok(res) => {
if util::is_not_leader(get_header(&res)) {
last_error = Some(format!("{leader} is not a leader"));
warn!("Failed to {task} to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
let leader = leader_provider.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
@@ -194,7 +198,7 @@ impl Inner {
if util::is_unreachable(&status) {
last_error = Some(status.to_string());
warn!("Failed to {task} to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
let leader = leader_provider.ask_leader().await?;
info!("Procedure client updated to new leader addr: {leader}");
times += 1;
continue;
@@ -204,7 +208,7 @@ impl Inner {
}
}
}
} else if let Err(err) = ask_leader.ask_leader().await {
} else if let Err(err) = leader_provider.ask_leader().await {
return Err(err);
}
}

View File

@@ -21,7 +21,7 @@ use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::{debug, info};
use serde::{Deserialize, Serialize};
use crate::client::MetaClientBuilder;
use crate::client::{LeaderProviderRef, MetaClientBuilder};
pub mod client;
pub mod error;
@@ -76,6 +76,7 @@ pub async fn create_meta_client(
client_type: MetaClientType,
meta_client_options: &MetaClientOptions,
plugins: Option<&Plugins>,
leader_provider: Option<LeaderProviderRef>,
) -> error::Result<MetaClientRef> {
info!(
"Creating {:?} instance with Metasrv addrs {:?}",
@@ -116,9 +117,15 @@ pub async fn create_meta_client(
let mut meta_client = builder.build();
meta_client
.start(&meta_client_options.metasrv_addrs)
.await?;
if let Some(leader_provider) = leader_provider {
meta_client
.start_with(leader_provider, &meta_client_options.metasrv_addrs)
.await?;
} else {
meta_client
.start(&meta_client_options.metasrv_addrs)
.await?;
}
meta_client.ask_leader().await?;

View File

@@ -928,7 +928,6 @@ impl ErrorExt for Error {
| Error::RetryLater { .. }
| Error::RetryLaterWithSource { .. }
| Error::StartGrpc { .. }
| Error::NoEnoughAvailableNode { .. }
| Error::PublishMessage { .. }
| Error::Join { .. }
| Error::PeerUnavailable { .. }
@@ -1024,6 +1023,8 @@ impl ErrorExt for Error {
| Error::CreateMySqlPool { .. }
| Error::ConnectMySql { .. }
| Error::ParseMySqlUrl { .. } => StatusCode::Internal,
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -105,6 +105,15 @@ impl GrpcOptions {
common_telemetry::debug!("detect local IP is not supported on Android");
}
}
/// Create a [GrpcServerConfig] from self's options.
pub fn as_config(&self) -> GrpcServerConfig {
GrpcServerConfig {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
tls: self.tls.clone(),
}
}
}
const DEFAULT_GRPC_ADDR_PORT: &str = "4001";