diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index db2e269861..a92cbbb1c3 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -18,18 +18,21 @@ use api::v1::meta::store_server::StoreServer; use snafu::ResultExt; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; +use tonic::transport::server::Router; use crate::election::etcd::EtcdElection; -use crate::error; use crate::metasrv::{MetaSrv, MetaSrvOptions}; use crate::service::admin; use crate::service::store::etcd::EtcdStore; +use crate::{error, Result}; // Bootstrap the rpc server to serve incoming request -pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { - let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; - let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?; +pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> Result<()> { + let meta_srv = make_meta_srv(opts.clone()).await?; + bootstrap_meta_srv_with_router(opts, router(meta_srv)).await +} +pub async fn bootstrap_meta_srv_with_router(opts: MetaSrvOptions, router: Router) -> Result<()> { let listener = TcpListener::bind(&opts.bind_addr) .await .context(error::TcpBindSnafu { @@ -37,18 +40,27 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { })?; let listener = TcpListenerStream::new(listener); - let meta_srv = MetaSrv::new(opts, kv_store, None, Some(election)).await; - meta_srv.start().await; - - tonic::transport::Server::builder() - .accept_http1(true) // for admin services - .add_service(HeartbeatServer::new(meta_srv.clone())) - .add_service(RouterServer::new(meta_srv.clone())) - .add_service(StoreServer::new(meta_srv.clone())) - .add_service(admin::make_admin_service(meta_srv.clone())) + router .serve_with_incoming(listener) .await .context(error::StartGrpcSnafu)?; Ok(()) } + +pub fn router(meta_srv: MetaSrv) -> Router { + tonic::transport::Server::builder() + .accept_http1(true) // for admin services + .add_service(HeartbeatServer::new(meta_srv.clone())) + .add_service(RouterServer::new(meta_srv.clone())) + .add_service(StoreServer::new(meta_srv.clone())) + .add_service(admin::make_admin_service(meta_srv)) +} + +pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { + let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; + let election = EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?; + let meta_srv = MetaSrv::new(opts, kv_store, None, Some(election)).await; + meta_srv.start().await; + Ok(meta_srv) +}