diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index b1214d72df..81d7597750 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -198,13 +198,13 @@ impl Inner { } ); - let leader = self + let leader_addr = self .ask_leader .as_ref() .unwrap() .get_leader() .context(error::NoLeaderSnafu)?; - let mut leader = self.make_client(leader)?; + let mut leader = self.make_client(&leader_addr)?; let (sender, receiver) = mpsc::channel::(128); @@ -236,7 +236,11 @@ impl Inner { .await .map_err(error::Error::from)? .context(error::CreateHeartbeatStreamSnafu)?; - info!("Success to create heartbeat stream to server: {:#?}", res); + + info!( + "Success to create heartbeat stream to server: {}, response: {:#?}", + leader_addr, res + ); Ok(( HeartbeatSender::new(self.id, self.role, sender), diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index e20bb2c4db..5fc438d174 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -17,13 +17,15 @@ use std::time::Duration; use api::v1::meta::{ procedure_service_server, DdlTaskRequest as PbDdlTaskRequest, - DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, + DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, + ResponseHeader, }; use common_meta::ddl::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; use common_meta::rpc::procedure; -use snafu::{ensure, OptionExt, ResultExt}; +use common_telemetry::warn; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response}; use super::GrpcResult; @@ -37,6 +39,16 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { + if !self.is_leader() { + let resp = ProcedureStateResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let QueryProcedureRequest { header, pid, .. } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; let pid = pid.context(error::MissingRequiredParameterSnafu { param: "pid" })?; @@ -57,6 +69,16 @@ impl procedure_service_server::ProcedureService for Metasrv { } async fn ddl(&self, request: Request) -> GrpcResult { + if !self.is_leader() { + let resp = PbDdlTaskResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let PbDdlTaskRequest { header, query_context, @@ -99,12 +121,15 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { - ensure!( - self.meta_peer_client().is_leader(), - error::UnexpectedSnafu { - violated: "Trying to submit a region migration procedure to non-leader meta server" - } - ); + if !self.is_leader() { + let resp = MigrateRegionResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } let MigrateRegionRequest { header, @@ -150,6 +175,16 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { + if !self.is_leader() { + let resp = ProcedureDetailResponse { + header: Some(ResponseHeader::failed(0, Error::is_not_leader())), + ..Default::default() + }; + + warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request); + return Ok(Response::new(resp)); + } + let ProcedureDetailRequest { header } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; let metas = self