diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index b89657abd7..c943cba994 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,10 +16,10 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; -use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::frontend::FrontendOptions; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; -use frontend::instance::Instance; +use frontend::instance::{FrontendInstance, Instance}; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -94,12 +94,16 @@ impl StartCommand { let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?); let opts: FrontendOptions = self.try_into()?; - let instance = Instance::try_new_distributed(&opts, plugins.clone()) + let mut instance = Instance::try_new_distributed(&opts, plugins.clone()) .await .context(error::StartFrontendSnafu)?; - let mut frontend = Frontend::new(opts, instance, plugins); - frontend.start().await.context(error::StartFrontendSnafu) + instance + .build_servers(&opts, plugins) + .await + .context(error::StartFrontendSnafu)?; + + instance.start().await.context(error::StartFrontendSnafu) } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 4675f704a1..d046028709 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,10 +21,10 @@ use datanode::datanode::{ CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, }; use datanode::instance::InstanceRef; -use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::frontend::FrontendOptions; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; -use frontend::instance::Instance as FeInstance; +use frontend::instance::{FrontendInstance, Instance as FeInstance}; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; @@ -187,7 +187,7 @@ impl StartCommand { let mut datanode = Datanode::new(dn_opts.clone()) .await .context(StartDatanodeSnafu)?; - let mut frontend = build_frontend(fe_opts, plugins, datanode.get_instance()).await?; + let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?; // Start datanode instance before starting services, to avoid requests come in before internal components are started. datanode @@ -196,6 +196,11 @@ impl StartCommand { .context(StartDatanodeSnafu)?; info!("Datanode instance started"); + frontend + .build_servers(&fe_opts, plugins) + .await + .context(StartFrontendSnafu)?; + frontend.start().await.context(StartFrontendSnafu)?; Ok(()) } @@ -203,14 +208,13 @@ impl StartCommand { /// Build frontend instance in standalone mode async fn build_frontend( - fe_opts: FrontendOptions, plugins: Arc, datanode_instance: InstanceRef, -) -> Result> { +) -> Result { let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone()); frontend_instance.set_script_handler(datanode_instance); frontend_instance.set_plugins(plugins.clone()); - Ok(Frontend::new(fe_opts, frontend_instance, plugins)) + Ok(frontend_instance) } impl TryFrom for FrontendOptions { diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3691504498..99a6b1be10 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -44,6 +44,12 @@ pub enum Error { source: servers::error::Error, }, + #[snafu(display("Failed to shutdown server, source: {}", source))] + ShutdownServer { + #[snafu(backtrace)] + source: servers::error::Error, + }, + #[snafu(display("Failed to parse address {}, source: {}", addr, source))] ParseAddr { addr: String, @@ -381,6 +387,7 @@ impl ErrorExt for Error { Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), + Error::ShutdownServer { source, .. } => source.status_code(), Error::ParseSql { source } => source.status_code(), diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index da862511fa..b99092134f 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -12,25 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use common_base::Plugins; use meta_client::MetaClientOptions; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::Mode; -use snafu::prelude::*; -use crate::error::{self, Result}; use crate::grpc::GrpcOptions; use crate::influxdb::InfluxdbOptions; -use crate::instance::FrontendInstance; use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; use crate::prom::PromOptions; use crate::prometheus::PrometheusOptions; -use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] @@ -64,40 +57,6 @@ impl Default for FrontendOptions { } } -pub struct Frontend -where - T: FrontendInstance, -{ - opts: FrontendOptions, - instance: Option, - plugins: Arc, -} - -impl Frontend { - pub fn new(opts: FrontendOptions, instance: T, plugins: Arc) -> Self { - Self { - opts, - instance: Some(instance), - plugins, - } - } - - pub async fn start(&mut self) -> Result<()> { - let mut instance = self - .instance - .take() - .context(error::IllegalFrontendStateSnafu { - err_msg: "Frontend instance not initialized", - })?; - instance.start().await?; - - let instance = Arc::new(instance); - - // TODO(sunng87): merge this into instance - Services::start(&self.opts, instance, self.plugins.clone()).await - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 0f4cde7f4d..6dd95414ce 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -72,6 +72,7 @@ use crate::error::{ use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; +use crate::server::{start_server, ServerHandlers, Services}; #[async_trait] pub trait FrontendInstance: @@ -106,6 +107,8 @@ pub struct Instance { /// plugins: this map holds extensions to customize query or auth /// behaviours. plugins: Arc, + + servers: Arc, } impl Instance { @@ -143,7 +146,8 @@ impl Instance { sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, promql_handler: None, - plugins, + plugins: plugins.clone(), + servers: Arc::new(HashMap::new()), }) } @@ -186,9 +190,21 @@ impl Instance { grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), promql_handler: Some(dn_instance.clone()), plugins: Default::default(), + servers: Arc::new(HashMap::new()), } } + pub async fn build_servers( + &mut self, + opts: &FrontendOptions, + plugins: Arc, + ) -> Result<()> { + let servers = Services::build(opts, Arc::new(self.clone()), plugins).await?; + self.servers = Arc::new(servers); + + Ok(()) + } + #[cfg(test)] pub(crate) fn new_distributed(dist_instance: Arc) -> Self { Instance { @@ -199,6 +215,7 @@ impl Instance { grpc_query_handler: dist_instance, promql_handler: None, plugins: Default::default(), + servers: Arc::new(HashMap::new()), } } @@ -373,13 +390,24 @@ impl Instance { pub fn plugins(&self) -> Arc { self.plugins.clone() } + + pub async fn shutdown(&self) -> Result<()> { + futures::future::try_join_all(self.servers.values().map(|server| server.0.shutdown())) + .await + .context(error::ShutdownServerSnafu) + .map(|_| ()) + } } #[async_trait] impl FrontendInstance for Instance { async fn start(&mut self) -> Result<()> { // TODO(hl): Frontend init should move to here - Ok(()) + + futures::future::try_join_all(self.servers.values().map(start_server)) + .await + .context(error::StartServerSnafu) + .map(|_| ()) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index c6d609e3f8..d6c03e2931 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -30,7 +31,6 @@ use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; use snafu::ResultExt; -use tokio::try_join; use crate::error::Error::StartServer; use crate::error::{self, Result}; @@ -41,19 +41,23 @@ use crate::prometheus::PrometheusOptions; pub(crate) struct Services; +pub type ServerHandlers = HashMap; + +pub type ServerHandler = (Box, SocketAddr); + impl Services { - pub(crate) async fn start( + pub(crate) async fn build( opts: &FrontendOptions, instance: Arc, plugins: Arc, - ) -> Result<()> + ) -> Result where T: FrontendInstance, { - info!("Starting frontend servers"); + let mut result = Vec::::with_capacity(plugins.len()); let user_provider = plugins.get::().cloned(); - let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options { + if let Some(opts) = &opts.grpc_options { let grpc_addr = parse_addr(&opts.addr)?; let grpc_runtime = Arc::new( @@ -70,12 +74,10 @@ impl Services { grpc_runtime, ); - Some((Box::new(grpc_server) as _, grpc_addr)) - } else { - None + result.push((Box::new(grpc_server), grpc_addr)); }; - let mysql_server_and_addr = if let Some(opts) = &opts.mysql_options { + if let Some(opts) = &opts.mysql_options { let mysql_addr = parse_addr(&opts.addr)?; let mysql_io_runtime = Arc::new( @@ -102,13 +104,10 @@ impl Services { opts.reject_no_database.unwrap_or(false), )), ); + result.push((mysql_server, mysql_addr)); + } - Some((mysql_server, mysql_addr)) - } else { - None - }; - - let postgres_server_and_addr = if let Some(opts) = &opts.postgres_options { + if let Some(opts) = &opts.postgres_options { let pg_addr = parse_addr(&opts.addr)?; let pg_io_runtime = Arc::new( @@ -126,12 +125,12 @@ impl Services { user_provider.clone(), )) as Box; - Some((pg_server, pg_addr)) - } else { - None - }; + result.push((pg_server, pg_addr)); + } - let opentsdb_server_and_addr = if let Some(opts) = &opts.opentsdb_options { + let mut set_opentsdb_handler = false; + + if let Some(opts) = &opts.opentsdb_options { let addr = parse_addr(&opts.addr)?; let io_runtime = Arc::new( @@ -144,12 +143,11 @@ impl Services { let server = OpentsdbServer::create_server(instance.clone(), io_runtime); - Some((server, addr)) - } else { - None - }; + result.push((server, addr)); + set_opentsdb_handler = true; + } - let http_server_and_addr = if let Some(http_options) = &opts.http_options { + if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; let mut http_server = HttpServer::new( @@ -160,7 +158,7 @@ impl Services { http_server.set_user_provider(user_provider); } - if opentsdb_server_and_addr.is_some() { + if set_opentsdb_handler { http_server.set_opentsdb_handler(instance.clone()); } if matches!( @@ -178,34 +176,24 @@ impl Services { } http_server.set_script_handler(instance.clone()); - Some((Box::new(http_server) as _, http_addr)) - } else { - None - }; + result.push((Box::new(http_server), http_addr)); + } - let prom_server_and_addr = if let Some(prom_options) = &opts.prom_options { + if let Some(prom_options) = &opts.prom_options { let prom_addr = parse_addr(&prom_options.addr)?; - let mut prom_server = PromServer::create_server(instance.clone()); + let mut prom_server = PromServer::create_server(instance); if let Some(user_provider) = user_provider { prom_server.set_user_provider(user_provider); } - Some((prom_server as _, prom_addr)) - } else { - None + result.push((prom_server, prom_addr)); }; - try_join!( - start_server(http_server_and_addr), - start_server(grpc_server_and_addr), - start_server(mysql_server_and_addr), - start_server(postgres_server_and_addr), - start_server(opentsdb_server_and_addr), - start_server(prom_server_and_addr), - ) - .context(error::StartServerSnafu)?; - Ok(()) + Ok(result + .into_iter() + .map(|(server, addr)| (server.name().to_string(), (server, addr))) + .collect()) } } @@ -213,13 +201,10 @@ fn parse_addr(addr: &str) -> Result { addr.parse().context(error::ParseAddrSnafu { addr }) } -async fn start_server( - server_and_addr: Option<(Box, SocketAddr)>, +pub async fn start_server( + server_and_addr: &(Box, SocketAddr), ) -> servers::error::Result> { - if let Some((server, addr)) = server_and_addr { - info!("Starting server at {}", addr); - server.start(addr).await.map(Some) - } else { - Ok(None) - } + let (server, addr) = server_and_addr; + info!("Starting {} at {}", server.name(), addr); + server.start(*addr).await.map(Some) } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 39b76018b9..1e09f302ec 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -65,6 +65,8 @@ impl GrpcServer { } } +pub const GRPC_SERVER: &str = "GRPC_SERVER"; + #[async_trait] impl Server for GrpcServer { async fn shutdown(&self) -> Result<()> { @@ -108,4 +110,8 @@ impl Server for GrpcServer { Ok(addr) } + + fn name(&self) -> &str { + GRPC_SERVER + } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 721210453b..219d6733a0 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -519,6 +519,8 @@ impl HttpServer { } } +pub const HTTP_SERVER: &str = "HTTP_SERVER"; + #[async_trait] impl Server for HttpServer { async fn shutdown(&self) -> Result<()> { @@ -557,6 +559,10 @@ impl Server for HttpServer { Ok(listening) } + + fn name(&self) -> &str { + HTTP_SERVER + } } /// handle error middleware diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index 4653e66023..8f3f14a339 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -200,6 +200,8 @@ impl MysqlServer { } } +pub const MYSQL_SERVER: &str = "MYSQL_SERVER"; + #[async_trait] impl Server for MysqlServer { async fn shutdown(&self) -> Result<()> { @@ -214,4 +216,8 @@ impl Server for MysqlServer { self.base_server.start_with(join_handle).await?; Ok(addr) } + + fn name(&self) -> &str { + MYSQL_SERVER + } } diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 99504bb255..b6923e6b03 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -97,6 +97,8 @@ impl OpentsdbServer { } } +pub const OPENTSDB_SERVER: &str = "OPENTSDB_SERVER"; + #[async_trait] impl Server for OpentsdbServer { async fn shutdown(&self) -> Result<()> { @@ -117,4 +119,7 @@ impl Server for OpentsdbServer { self.base_server.start_with(join_handle).await?; Ok(addr) } + fn name(&self) -> &str { + OPENTSDB_SERVER + } } diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index fb57d94869..a9fea0ce8a 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -97,6 +97,8 @@ impl PostgresServer { } } +pub const POSTGRES_SERVER: &str = "POSTGRES_SERVER"; + #[async_trait] impl Server for PostgresServer { async fn shutdown(&self) -> Result<()> { @@ -118,4 +120,8 @@ impl Server for PostgresServer { self.base_server.start_with(join_handle).await?; Ok(addr) } + + fn name(&self) -> &str { + POSTGRES_SERVER + } } diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 6382f37200..a7ba2568a7 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -106,6 +106,8 @@ impl PromServer { } } +pub const PROM_SERVER: &str = "PROM_SERVER"; + #[async_trait] impl Server for PromServer { async fn shutdown(&self) -> Result<()> { @@ -146,6 +148,10 @@ impl Server for PromServer { Ok(listening) } + + fn name(&self) -> &str { + PROM_SERVER + } } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index e2a264c96e..e91fbe5085 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -29,7 +29,7 @@ use crate::error::{self, Result}; pub(crate) type AbortableStream = Abortable; #[async_trait] -pub trait Server: Send { +pub trait Server: Send + Sync { /// Shutdown the server gracefully. async fn shutdown(&self) -> Result<()>; @@ -37,6 +37,8 @@ pub trait Server: Send { /// /// Caller should ensure `start()` is only invoked once. async fn start(&self, listening: SocketAddr) -> Result; + + fn name(&self) -> &str; } struct AcceptTask { diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index e4a6606b63..38da7fb48f 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -91,6 +91,10 @@ impl Server for MockGrpcServer { Ok(addr) } + + fn name(&self) -> &str { + "MockGrpcServer" + } } fn create_grpc_server(table: MemTable) -> Result> {