From 3901863432fe14875c27abb0b5ea35f305f9ac27 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 23 May 2025 17:53:42 +0800 Subject: [PATCH] chore: metasrv starting not blocking (#6158) * chore: metasrv starting not blocking * chore: fmt * chore: expose actual bind_addr --- src/meta-srv/src/bootstrap.rs | 68 +++++++++++++++++++++++++---------- 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index c669e2d546..237af0612b 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::net::SocketAddr; use std::sync::Arc; use api::v1::meta::cluster_server::ClusterServer; @@ -36,7 +37,6 @@ use common_telemetry::info; #[cfg(feature = "pg_kvbackend")] use deadpool_postgres::{Config, Runtime}; use etcd_client::Client; -use futures::future; use servers::configurator::ConfiguratorRef; use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; @@ -53,6 +53,7 @@ use sqlx::mysql::{MySqlConnection, MySqlPool}; use sqlx::Connection; use tokio::net::TcpListener; use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::{oneshot, Mutex}; #[cfg(feature = "pg_kvbackend")] use tokio_postgres::NoTls; use tonic::codec::CompressionEncoding; @@ -88,6 +89,12 @@ pub struct MetasrvInstance { plugins: Plugins, export_metrics_task: Option, + + /// gRPC serving state receiver. Only present if the gRPC server is started. + serve_state: Arc>>>>, + + /// gRPC bind addr + bind_addr: Option, } impl MetasrvInstance { @@ -113,6 +120,8 @@ impl MetasrvInstance { signal_sender: None, plugins, export_metrics_task, + serve_state: Default::default(), + bind_addr: None, }) } @@ -132,21 +141,30 @@ impl MetasrvInstance { router = configurator.config_grpc(router); } - let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx); + let (serve_state_tx, serve_state_rx) = oneshot::channel(); + + let socket_addr = + bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?; + self.bind_addr = Some(socket_addr); + let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu { addr: &self.opts.http.addr, })?; - let http_srv = async { - self.http_server - .start(addr) - .await - .context(error::StartHttpSnafu) - }; - future::try_join(metasrv, http_srv).await?; + self.http_server + .start(addr) + .await + .context(error::StartHttpSnafu)?; + + *self.serve_state.lock().await = Some(serve_state_rx); Ok(()) } pub async fn shutdown(&self) -> Result<()> { + if let Some(mut rx) = self.serve_state.lock().await.take() { + if let Ok(Err(err)) = rx.try_recv() { + common_telemetry::error!(err; "Metasrv start failed") + } + } if let Some(signal) = &self.signal_sender { signal .send(()) @@ -170,30 +188,42 @@ impl MetasrvInstance { pub fn get_inner(&self) -> &Metasrv { &self.metasrv } + pub fn bind_addr(&self) -> &Option { + &self.bind_addr + } } pub async fn bootstrap_metasrv_with_router( bind_addr: &str, router: Router, - mut signal: Receiver<()>, -) -> Result<()> { + serve_state_tx: oneshot::Sender>, + mut shutdown_rx: Receiver<()>, +) -> Result { let listener = TcpListener::bind(bind_addr) .await .context(error::TcpBindSnafu { addr: bind_addr })?; - info!("gRPC server is bound to: {bind_addr}"); + let real_bind_addr = listener + .local_addr() + .context(error::TcpBindSnafu { addr: bind_addr })?; + + info!("gRPC server is bound to: {}", real_bind_addr); let incoming = TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?; - router - .serve_with_incoming_shutdown(incoming, async { - let _ = signal.recv().await; - }) - .await - .context(error::StartGrpcSnafu)?; + let _handle = common_runtime::spawn_global(async move { + let result = router + .serve_with_incoming_shutdown(incoming, async { + let _ = shutdown_rx.recv().await; + }) + .await + .inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv")) + .context(error::StartGrpcSnafu); + let _ = serve_state_tx.send(result); + }); - Ok(()) + Ok(real_bind_addr) } #[macro_export]