fix(metasrv): reject ddl when metasrv is follower (#5599)

* fix/reject-ddl-in-follower-metasrv:
 Add leader check and logging for gRPC requests in `procedure.rs`

 - Implemented leader verification for `query_procedure_state`, `ddl`, and `procedure_details` gRPC requests in `procedure.rs`.
 - Added logging with `warn` for requests reaching a non-leader node.
 - Introduced `ResponseHeader` and `Error::is_not_leader()` to handle non-leader responses.

* fix/reject-ddl-in-follower-metasrv:
 Improve leader address handling in `heartbeat.rs`

 - Refactor leader address retrieval by renaming `leader` to `leader_addr` for clarity.
 - Update `make_client` function to use a reference to `leader_addr`.
 - Enhance logging to include the leader address in the success message for creating a heartbeat stream.

* fmt

* fix/reject-ddl-in-follower-metasrv:
 **Enhance Leader Check in `procedure.rs`**

 - Updated the leader verification logic in `procedure.rs` to return a failed `MigrateRegionResponse` when the server is not the leader.
 - Added logging to warn when a migrate request is received by a non-leader server.
This commit is contained in:
Lei, HUANG
2025-02-26 16:10:40 +08:00
committed by GitHub
parent 538875abee
commit 5d9faaaf39
2 changed files with 50 additions and 11 deletions

View File

@@ -198,13 +198,13 @@ impl Inner {
}
);
let leader = self
let leader_addr = self
.ask_leader
.as_ref()
.unwrap()
.get_leader()
.context(error::NoLeaderSnafu)?;
let mut leader = self.make_client(leader)?;
let mut leader = self.make_client(&leader_addr)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
@@ -236,7 +236,11 @@ impl Inner {
.await
.map_err(error::Error::from)?
.context(error::CreateHeartbeatStreamSnafu)?;
info!("Success to create heartbeat stream to server: {:#?}", res);
info!(
"Success to create heartbeat stream to server: {}, response: {:#?}",
leader_addr, res
);
Ok((
HeartbeatSender::new(self.id, self.role, sender),

View File

@@ -17,13 +17,15 @@ use std::time::Duration;
use api::v1::meta::{
procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
ResponseHeader,
};
use common_meta::ddl::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
use common_meta::rpc::procedure;
use snafu::{ensure, OptionExt, ResultExt};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use super::GrpcResult;
@@ -37,6 +39,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<QueryProcedureRequest>,
) -> GrpcResult<ProcedureStateResponse> {
if !self.is_leader() {
let resp = ProcedureStateResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let QueryProcedureRequest { header, pid, .. } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?;
@@ -57,6 +69,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
}
async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
if !self.is_leader() {
let resp = PbDdlTaskResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let PbDdlTaskRequest {
header,
query_context,
@@ -99,12 +121,15 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<MigrateRegionRequest>,
) -> GrpcResult<MigrateRegionResponse> {
ensure!(
self.meta_peer_client().is_leader(),
error::UnexpectedSnafu {
violated: "Trying to submit a region migration procedure to non-leader meta server"
}
);
if !self.is_leader() {
let resp = MigrateRegionResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let MigrateRegionRequest {
header,
@@ -150,6 +175,16 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<ProcedureDetailRequest>,
) -> GrpcResult<ProcedureDetailResponse> {
if !self.is_leader() {
let resp = ProcedureDetailResponse {
header: Some(ResponseHeader::failed(0, Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
let ProcedureDetailRequest { header } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let metas = self