chore: metasrv starting not blocking (#6158)

* chore: metasrv starting not blocking

* chore: fmt

* chore: expose actual bind_addr
This commit is contained in:
discord9
2025-05-23 17:53:42 +08:00
committed by GitHub
parent 27e339f628
commit 3901863432

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
@@ -36,7 +37,6 @@ use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use etcd_client::Client;
use futures::future;
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -53,6 +53,7 @@ use sqlx::mysql::{MySqlConnection, MySqlPool};
use sqlx::Connection;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
#[cfg(feature = "pg_kvbackend")]
use tokio_postgres::NoTls;
use tonic::codec::CompressionEncoding;
@@ -88,6 +89,12 @@ pub struct MetasrvInstance {
plugins: Plugins,
export_metrics_task: Option<ExportMetricsTask>,
/// gRPC serving state receiver. Only present if the gRPC server is started.
serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
/// gRPC bind addr
bind_addr: Option<SocketAddr>,
}
impl MetasrvInstance {
@@ -113,6 +120,8 @@ impl MetasrvInstance {
signal_sender: None,
plugins,
export_metrics_task,
serve_state: Default::default(),
bind_addr: None,
})
}
@@ -132,21 +141,30 @@ impl MetasrvInstance {
router = configurator.config_grpc(router);
}
let metasrv = bootstrap_metasrv_with_router(&self.opts.bind_addr, router, rx);
let (serve_state_tx, serve_state_rx) = oneshot::channel();
let socket_addr =
bootstrap_metasrv_with_router(&self.opts.bind_addr, router, serve_state_tx, rx).await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let http_srv = async {
self.http_server
.start(addr)
.await
.context(error::StartHttpSnafu)
};
future::try_join(metasrv, http_srv).await?;
self.http_server
.start(addr)
.await
.context(error::StartHttpSnafu)?;
*self.serve_state.lock().await = Some(serve_state_rx);
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
if let Some(mut rx) = self.serve_state.lock().await.take() {
if let Ok(Err(err)) = rx.try_recv() {
common_telemetry::error!(err; "Metasrv start failed")
}
}
if let Some(signal) = &self.signal_sender {
signal
.send(())
@@ -170,30 +188,42 @@ impl MetasrvInstance {
pub fn get_inner(&self) -> &Metasrv {
&self.metasrv
}
pub fn bind_addr(&self) -> &Option<SocketAddr> {
&self.bind_addr
}
}
pub async fn bootstrap_metasrv_with_router(
bind_addr: &str,
router: Router,
mut signal: Receiver<()>,
) -> Result<()> {
serve_state_tx: oneshot::Sender<Result<()>>,
mut shutdown_rx: Receiver<()>,
) -> Result<SocketAddr> {
let listener = TcpListener::bind(bind_addr)
.await
.context(error::TcpBindSnafu { addr: bind_addr })?;
info!("gRPC server is bound to: {bind_addr}");
let real_bind_addr = listener
.local_addr()
.context(error::TcpBindSnafu { addr: bind_addr })?;
info!("gRPC server is bound to: {}", real_bind_addr);
let incoming =
TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;
router
.serve_with_incoming_shutdown(incoming, async {
let _ = signal.recv().await;
})
.await
.context(error::StartGrpcSnafu)?;
let _handle = common_runtime::spawn_global(async move {
let result = router
.serve_with_incoming_shutdown(incoming, async {
let _ = shutdown_rx.recv().await;
})
.await
.inspect_err(|err| common_telemetry::error!(err;"Failed to start metasrv"))
.context(error::StartGrpcSnafu);
let _ = serve_state_tx.send(result);
});
Ok(())
Ok(real_bind_addr)
}
#[macro_export]