refactor: set the actual bound port in server handler (#3353)

* refactor: set the actual bound port so we can use port 0 in testing

* Update src/servers/src/server.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fmt

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
LFC
2024-02-23 10:49:11 +08:00
committed by GitHub
parent 7341f23019
commit 2035e7bf4c
13 changed files with 146 additions and 98 deletions

View File

@@ -43,6 +43,10 @@ impl Instance {
pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
}
#[async_trait]
@@ -235,6 +239,7 @@ impl StartCommand {
.with_default_grpc_server(&datanode.region_server())
.enable_http_service()
.build()
.await
.context(StartDatanodeSnafu)?;
datanode.setup_services(services);

View File

@@ -43,13 +43,17 @@ pub struct Instance {
}
impl Instance {
fn new(frontend: FeInstance) -> Self {
pub fn new(frontend: FeInstance) -> Self {
Self { frontend }
}
pub fn mut_inner(&mut self) -> &mut FeInstance {
&mut self.frontend
}
pub fn inner(&self) -> &FeInstance {
&self.frontend
}
}
#[async_trait]
@@ -271,6 +275,7 @@ impl StartCommand {
let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
instance
.build_servers(opts, servers)

View File

@@ -32,11 +32,11 @@ lazy_static::lazy_static! {
}
#[async_trait]
pub trait App {
pub trait App: Send {
fn name(&self) -> &str;
/// A hook for implementor to make something happened before actual startup. Defaults to no-op.
fn pre_start(&mut self) -> error::Result<()> {
async fn pre_start(&mut self) -> error::Result<()> {
Ok(())
}
@@ -46,24 +46,21 @@ pub trait App {
}
pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {
let name = app.name().to_string();
info!("Starting app: {}", app.name());
app.pre_start()?;
app.pre_start().await?;
tokio::select! {
result = app.start() => {
if let Err(err) = result {
error!(err; "Failed to start app {name}!");
}
}
_ = tokio::signal::ctrl_c() => {
if let Err(err) = app.stop().await {
error!(err; "Failed to stop app {name}!");
}
info!("Goodbye!");
}
app.start().await?;
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
app.stop().await?;
info!("Goodbye!");
Ok(())
}

View File

@@ -441,6 +441,7 @@ impl StartCommand {
let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
.await
.context(StartFrontendSnafu)?;
frontend
.build_servers(fe_opts, servers)

View File

@@ -14,7 +14,6 @@
//! Datanode implementation.
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
@@ -32,7 +31,6 @@ use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use file_engine::engine::FileRegionEngine;
use futures::future;
use futures_util::future::try_join_all;
use futures_util::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
@@ -45,7 +43,7 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::{start_server, ServerHandlers};
use servers::server::ServerHandlers;
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use store_api::path_utils::{region_dir, WAL_DIR};
@@ -97,7 +95,11 @@ impl Datanode {
t.start(None).context(StartServerSnafu)?
}
self.start_services().await
self.services.start_all().await.context(StartServerSnafu)
}
pub fn server_handlers(&self) -> &ServerHandlers {
&self.services
}
pub fn start_telemetry(&self) {
@@ -127,24 +129,12 @@ impl Datanode {
self.services = services;
}
/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(start_server))
.await
.context(StartServerSnafu)?;
Ok(())
}
async fn shutdown_services(&self) -> Result<()> {
let _ = future::try_join_all(self.services.values().map(|server| server.0.shutdown()))
pub async fn shutdown(&self) -> Result<()> {
self.services
.shutdown_all()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
// We must shutdown services first
self.shutdown_services().await?;
let _ = self.greptimedb_telemetry_task.stop().await;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
@@ -268,7 +258,7 @@ impl DatanodeBuilder {
.context(StartServerSnafu)?;
Ok(Datanode {
services: HashMap::new(),
services: ServerHandlers::default(),
heartbeat_task,
region_server,
greptimedb_telemetry_task,

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -27,9 +26,6 @@ use crate::config::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result};
use crate::region_server::RegionServer;
const DATANODE_GRPC_SERVICE_NAME: &str = "DATANODE_GRPC_SERVICE";
const DATANODE_HTTP_SERVICE_NAME: &str = "DATANODE_HTTP_SERVICE";
pub struct DatanodeServiceBuilder<'a> {
opts: &'a DatanodeOptions,
grpc_server: Option<GrpcServer>,
@@ -65,15 +61,15 @@ impl<'a> DatanodeServiceBuilder<'a> {
}
}
pub fn build(mut self) -> Result<ServerHandlers> {
let mut services = HashMap::new();
pub async fn build(mut self) -> Result<ServerHandlers> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
services.insert(DATANODE_GRPC_SERVICE_NAME.to_string(), handler);
handlers.insert(handler).await;
}
if self.enable_http_service {
@@ -85,10 +81,10 @@ impl<'a> DatanodeServiceBuilder<'a> {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
services.insert(DATANODE_HTTP_SERVICE_NAME.to_string(), handler);
handlers.insert(handler).await;
}
Ok(services)
Ok(handlers)
}
pub fn grpc_server_builder(

View File

@@ -67,7 +67,7 @@ use servers::query_handler::{
InfluxdbLineProtocolHandler, OpenTelemetryProtocolHandler, OpentsdbProtocolHandler,
PromStoreProtocolHandler, ScriptHandler,
};
use servers::server::{start_server, ServerHandlers};
use servers::server::ServerHandlers;
use session::context::QueryContextRef;
use snafu::prelude::*;
use sql::dialect::Dialect;
@@ -115,7 +115,7 @@ pub struct Instance {
statement_executor: Arc<StatementExecutor>,
query_engine: QueryEngineRef,
plugins: Plugins,
servers: Arc<ServerHandlers>,
servers: ServerHandlers,
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
@@ -198,8 +198,7 @@ impl Instance {
ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;
self.servers = Arc::new(servers);
self.servers = servers;
Ok(())
}
@@ -212,10 +211,14 @@ impl Instance {
}
pub async fn shutdown(&self) -> Result<()> {
futures::future::try_join_all(self.servers.values().map(|server| server.0.shutdown()))
self.servers
.shutdown_all()
.await
.context(error::ShutdownServerSnafu)
.map(|_| ())
}
pub fn server_handlers(&self) -> &ServerHandlers {
&self.servers
}
pub fn statement_executor(&self) -> Arc<StatementExecutor> {
@@ -248,13 +251,7 @@ impl FrontendInstance for Instance {
}
}
futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
info!("Starting service: {name}");
start_server(handler).await
}))
.await
.context(error::StartServerSnafu)
.map(|_| ())
self.servers.start_all().await.context(StartServerSnafu)
}
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use catalog::kvbackend::KvBackendCatalogManager;
@@ -29,6 +28,7 @@ use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use query::QueryEngineFactory;
use servers::server::ServerHandlers;
use crate::error::Result;
use crate::heartbeat::HeartbeatTask;
@@ -148,7 +148,7 @@ impl FrontendBuilder {
statement_executor,
query_engine,
plugins,
servers: Arc::new(HashMap::new()),
servers: ServerHandlers::default(),
heartbeat_task: self.heartbeat_task,
inserter,
deleter,

View File

@@ -29,7 +29,7 @@ use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::server::{Server, ServerHandler, ServerHandlers};
use servers::server::{Server, ServerHandlers};
use snafu::ResultExt;
use crate::error::{self, Result, StartServerSnafu};
@@ -164,14 +164,14 @@ where
Ok(http_server)
}
pub fn build(mut self) -> Result<ServerHandlers> {
pub async fn build(mut self) -> Result<ServerHandlers> {
let opts = self.opts.clone();
let instance = self.instance.clone();
let toml = opts.to_toml()?;
let opts: FrontendOptions = opts.into();
let mut result = Vec::<ServerHandler>::new();
let handlers = ServerHandlers::default();
let user_provider = self.plugins.get::<UserProviderRef>();
@@ -179,7 +179,7 @@ where
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.addr)?;
let grpc_server = self.build_grpc_server(&opts)?;
result.push((Box::new(grpc_server), grpc_addr));
handlers.insert((Box::new(grpc_server), grpc_addr)).await;
}
{
@@ -187,7 +187,7 @@ where
let http_options = &opts.http;
let http_addr = parse_addr(&http_options.addr)?;
let http_server = self.build_http_server(&opts, toml)?;
result.push((Box::new(http_server), http_addr));
handlers.insert((Box::new(http_server), http_addr)).await;
}
if opts.mysql.enable {
@@ -218,7 +218,7 @@ where
opts.reject_no_database.unwrap_or(false),
)),
);
result.push((mysql_server, mysql_addr));
handlers.insert((mysql_server, mysql_addr)).await;
}
if opts.postgres.enable {
@@ -241,7 +241,7 @@ where
user_provider.clone(),
)) as Box<dyn Server>;
result.push((pg_server, pg_addr));
handlers.insert((pg_server, pg_addr)).await;
}
if opts.opentsdb.enable {
@@ -259,13 +259,10 @@ where
let server = OpentsdbServer::create_server(instance.clone(), io_runtime);
result.push((server, addr));
handlers.insert((server, addr)).await;
}
Ok(result
.into_iter()
.map(|(server, addr)| (server.name().to_string(), (server, addr)))
.collect())
Ok(handlers)
}
}

View File

@@ -26,6 +26,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use etcd_client::Client;
use futures::future;
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -33,7 +34,6 @@ use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tonic::transport::server::{Router, TcpIncoming};
@@ -110,12 +110,14 @@ impl MetaSrvInstance {
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let http_srv = self.http_srv.start(addr);
select! {
v = meta_srv => v?,
v = http_srv => v.map(|_| ()).context(error::StartHttpSnafu)?,
}
let http_srv = async {
self.http_srv
.start(addr)
.await
.map(|_| ())
.context(error::StartHttpSnafu)
};
future::try_join(meta_srv, http_srv).await?;
Ok(())
}

View File

@@ -78,7 +78,7 @@ pub struct MetaSrvBuilder {
lock: Option<DistLockRef>,
datanode_manager: Option<DatanodeManagerRef>,
plugins: Option<Plugins>,
table_metadata_allocator: Option<TableMetadataAllocator>,
table_metadata_allocator: Option<TableMetadataAllocatorRef>,
}
impl MetaSrvBuilder {
@@ -150,7 +150,7 @@ impl MetaSrvBuilder {
pub fn table_metadata_allocator(
mut self,
table_metadata_allocator: TableMetadataAllocator,
table_metadata_allocator: TableMetadataAllocatorRef,
) -> Self {
self.table_metadata_allocator = Some(table_metadata_allocator);
self
@@ -211,7 +211,7 @@ impl MetaSrvBuilder {
options.wal.clone(),
kv_backend.clone(),
));
let table_metadata_allocator = Arc::new(table_metadata_allocator.unwrap_or_else(|| {
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
@@ -222,13 +222,13 @@ impl MetaSrvBuilder {
selector_ctx.clone(),
selector.clone(),
));
TableMetadataAllocator::with_peer_allocator(
Arc::new(TableMetadataAllocator::with_peer_allocator(
sequence,
wal_options_allocator.clone(),
table_metadata_manager.table_name_manager().clone(),
peer_allocator,
)
}));
))
});
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());

View File

@@ -51,7 +51,7 @@ use tower_http::trace::TraceLayer;
use self::authorize::AuthState;
use crate::configurator::ConfiguratorRef;
use crate::error::{AlreadyStartedSnafu, Error, Result, StartHttpSnafu, ToJsonSnafu};
use crate::error::{AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
use crate::http::arrow_result::ArrowResponse;
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
@@ -797,9 +797,15 @@ impl Server for HttpServer {
let listening = server.local_addr();
info!("HTTP server is bound to {}", listening);
let graceful = server.with_graceful_shutdown(rx.map(drop));
graceful.await.context(StartHttpSnafu)?;
common_runtime::spawn_bg(async move {
if let Err(e) = server
.with_graceful_shutdown(rx.map(drop))
.await
.context(HyperSnafu)
{
error!(e; "Failed to shutdown http server");
}
});
Ok(listening)
}

View File

@@ -19,9 +19,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::{error, info};
use futures::future::{AbortHandle, AbortRegistration, Abortable};
use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable};
use snafu::{ensure, ResultExt};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::TcpListenerStream;
@@ -29,14 +29,66 @@ use crate::error::{self, Result};
pub(crate) type AbortableStream = Abortable<TcpListenerStream>;
pub type ServerHandlers = HashMap<String, ServerHandler>;
pub type ServerHandler = (Box<dyn Server>, SocketAddr);
pub async fn start_server(server_handler: &ServerHandler) -> Result<Option<SocketAddr>> {
let (server, addr) = server_handler;
info!("Starting {} at {}", server.name(), addr);
server.start(*addr).await.map(Some)
/// [ServerHandlers] is used to manage the lifecycle of all the services like http or grpc in the GreptimeDB server.
#[derive(Clone, Default)]
pub struct ServerHandlers {
handlers: Arc<RwLock<HashMap<String, ServerHandler>>>,
}
impl ServerHandlers {
pub fn new() -> Self {
Self {
handlers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn insert(&self, handler: ServerHandler) {
let mut handlers = self.handlers.write().await;
handlers.insert(handler.0.name().to_string(), handler);
}
/// Finds the __actual__ bound address of the service by its name.
///
/// This is useful in testing. We can configure the service to bind to port 0 first, then start
/// the server to get the real bound port number. This way we avoid doing careful assignment of
/// the port number to the service in the test.
///
/// Note that the address is guaranteed to be correct only after the `start_all` method is
/// successfully invoked. Otherwise you may find the address to be what you configured before.
pub async fn addr(&self, name: &str) -> Option<SocketAddr> {
let handlers = self.handlers.read().await;
handlers.get(name).map(|x| x.1)
}
/// Starts all the managed services. It will block until all the services are started.
/// And it will set the actual bound address to the service.
pub async fn start_all(&self) -> Result<()> {
let mut handlers = self.handlers.write().await;
try_join_all(handlers.values_mut().map(|(server, addr)| async move {
let bind_addr = server.start(*addr).await?;
*addr = bind_addr;
info!("Service {} is started at {}", server.name(), bind_addr);
Ok::<(), error::Error>(())
}))
.await?;
Ok(())
}
/// Shutdown all the managed services. It will block until all the services are shutdown.
pub async fn shutdown_all(&self) -> Result<()> {
// Even though the `shutdown` method in server does not require mut self, we still acquire
// write lock to pair with `start_all` method.
let handlers = self.handlers.write().await;
try_join_all(handlers.values().map(|(server, _)| async move {
server.shutdown().await?;
info!("Service {} is shutdown!", server.name());
Ok::<(), error::Error>(())
}))
.await?;
Ok(())
}
}
#[async_trait]