From 5d9faaaf390d86df44a13bb15360fafb04a9bd17 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 26 Feb 2025 16:10:40 +0800 Subject: [PATCH] fix(metasrv): reject ddl when metasrv is follower (#5599) * fix/reject-ddl-in-follower-metasrv: Add leader check and logging for gRPC requests in `procedure.rs` - Implemented leader verification for `query_procedure_state`, `ddl`, and `procedure_details` gRPC requests in `procedure.rs`. - Added logging with `warn` for requests reaching a non-leader node. - Introduced `ResponseHeader` and `Error::is_not_leader()` to handle non-leader responses. * fix/reject-ddl-in-follower-metasrv: Improve leader address handling in `heartbeat.rs` - Refactor leader address retrieval by renaming `leader` to `leader_addr` for clarity. - Update `make_client` function to use a reference to `leader_addr`. - Enhance logging to include the leader address in the success message for creating a heartbeat stream. * fmt * fix/reject-ddl-in-follower-metasrv: **Enhance Leader Check in `procedure.rs`** - Updated the leader verification logic in `procedure.rs` to return a failed `MigrateRegionResponse` when the server is not the leader. - Added logging to warn when a migrate request is received by a non-leader server. --- src/meta-client/src/client/heartbeat.rs | 10 +++-- src/meta-srv/src/service/procedure.rs | 51 +++++++++++++++++++++---- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index b1214d72df..81d7597750 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -198,13 +198,13 @@ impl Inner { } ); - let leader = self + let leader_addr = self .ask_leader .as_ref() .unwrap() .get_leader() .context(error::NoLeaderSnafu)?; - let mut leader = self.make_client(leader)?; + let mut leader = self.make_client(&leader_addr)?; let (sender, receiver) = mpsc::channel::(128); @@ -236,7 +236,11 @@ impl Inner { .await .map_err(error::Error::from)? .context(error::CreateHeartbeatStreamSnafu)?; - info!("Success to create heartbeat stream to server: {:#?}", res); + + info!( + "Success to create heartbeat stream to server: {}, response: {:#?}", + leader_addr, res + ); Ok(( HeartbeatSender::new(self.id, self.role, sender), diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index e20bb2c4db..5fc438d174 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -17,13 +17,15 @@ use std::time::Duration; use api::v1::meta::{ procedure_service_server, DdlTaskRequest as PbDdlTaskRequest, - DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, + DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, + ResponseHeader, }; use common_meta::ddl::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; use common_meta::rpc::procedure; -use snafu::{ensure, OptionExt, ResultExt}; +use common_telemetry::warn; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; use super::GrpcResult; @@ -37,6 +39,16 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { + if !self.is_leader() { + let resp = ProcedureStateResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let QueryProcedureRequest { header, pid, .. } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?; @@ -57,6 +69,16 @@ impl procedure_service_server::ProcedureService for Metasrv { } async fn ddl(&self, request: Request) -> GrpcResult { + if !self.is_leader() { + let resp = PbDdlTaskResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let PbDdlTaskRequest { header, query_context, @@ -99,12 +121,15 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { - ensure!( - self.meta_peer_client().is_leader(), - error::UnexpectedSnafu { - violated: "Trying to submit a region migration procedure to non-leader meta server" - } - ); + if !self.is_leader() { + let resp = MigrateRegionResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } let MigrateRegionRequest { header, @@ -150,6 +175,16 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { + if !self.is_leader() { + let resp = ProcedureDetailResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let ProcedureDetailRequest { header } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; let metas = self