feat: add shutdown for frontend (#1161)

This commit is contained in:
Weny Xu
2023-03-13 17:59:36 +08:00
committed by GitHub
parent 8a83de4ea5
commit c7f114c8fa
14 changed files with 136 additions and 108 deletions

View File

@@ -16,10 +16,10 @@ use std::sync::Arc;
use clap::Parser;
use common_base::Plugins;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance;
use frontend::instance::{FrontendInstance, Instance};
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
@@ -94,12 +94,16 @@ impl StartCommand {
let plugins = Arc::new(load_frontend_plugins(&self.user_provider)?);
let opts: FrontendOptions = self.try_into()?;
let instance = Instance::try_new_distributed(&opts, plugins.clone())
let mut instance = Instance::try_new_distributed(&opts, plugins.clone())
.await
.context(error::StartFrontendSnafu)?;
let mut frontend = Frontend::new(opts, instance, plugins);
frontend.start().await.context(error::StartFrontendSnafu)
instance
.build_servers(&opts, plugins)
.await
.context(error::StartFrontendSnafu)?;
instance.start().await.context(error::StartFrontendSnafu)
}
}

View File

@@ -21,10 +21,10 @@ use datanode::datanode::{
CompactionConfig, Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig,
};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::frontend::FrontendOptions;
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance as FeInstance;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
@@ -187,7 +187,7 @@ impl StartCommand {
let mut datanode = Datanode::new(dn_opts.clone())
.await
.context(StartDatanodeSnafu)?;
let mut frontend = build_frontend(fe_opts, plugins, datanode.get_instance()).await?;
let mut frontend = build_frontend(plugins.clone(), datanode.get_instance()).await?;
// Start datanode instance before starting services, to avoid requests come in before internal components are started.
datanode
@@ -196,6 +196,11 @@ impl StartCommand {
.context(StartDatanodeSnafu)?;
info!("Datanode instance started");
frontend
.build_servers(&fe_opts, plugins)
.await
.context(StartFrontendSnafu)?;
frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
@@ -203,14 +208,13 @@ impl StartCommand {
/// Build frontend instance in standalone mode
async fn build_frontend(
fe_opts: FrontendOptions,
plugins: Arc<Plugins>,
datanode_instance: InstanceRef,
) -> Result<Frontend<FeInstance>> {
) -> Result<FeInstance> {
let mut frontend_instance = FeInstance::new_standalone(datanode_instance.clone());
frontend_instance.set_script_handler(datanode_instance);
frontend_instance.set_plugins(plugins.clone());
Ok(Frontend::new(fe_opts, frontend_instance, plugins))
Ok(frontend_instance)
}
impl TryFrom<StartCommand> for FrontendOptions {

View File

@@ -44,6 +44,12 @@ pub enum Error {
source: servers::error::Error,
},
#[snafu(display("Failed to shutdown server, source: {}", source))]
ShutdownServer {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
@@ -381,6 +387,7 @@ impl ErrorExt for Error {
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ShutdownServer { source, .. } => source.status_code(),
Error::ParseSql { source } => source.status_code(),

View File

@@ -12,25 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_base::Plugins;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::grpc::GrpcOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::FrontendInstance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prom::PromOptions;
use crate::prometheus::PrometheusOptions;
use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
@@ -64,40 +57,6 @@ impl Default for FrontendOptions {
}
}
pub struct Frontend<T>
where
T: FrontendInstance,
{
opts: FrontendOptions,
instance: Option<T>,
plugins: Arc<Plugins>,
}
impl<T: FrontendInstance> Frontend<T> {
pub fn new(opts: FrontendOptions, instance: T, plugins: Arc<Plugins>) -> Self {
Self {
opts,
instance: Some(instance),
plugins,
}
}
pub async fn start(&mut self) -> Result<()> {
let mut instance = self
.instance
.take()
.context(error::IllegalFrontendStateSnafu {
err_msg: "Frontend instance not initialized",
})?;
instance.start().await?;
let instance = Arc::new(instance);
// TODO(sunng87): merge this into instance
Services::start(&self.opts, instance, self.plugins.clone()).await
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -72,6 +72,7 @@ use crate::error::{
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler};
use crate::server::{start_server, ServerHandlers, Services};
#[async_trait]
pub trait FrontendInstance:
@@ -106,6 +107,8 @@ pub struct Instance {
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
plugins: Arc<Plugins>,
servers: Arc<ServerHandlers>,
}
impl Instance {
@@ -143,7 +146,8 @@ impl Instance {
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
})
}
@@ -186,9 +190,21 @@ impl Instance {
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
}
}
pub async fn build_servers(
&mut self,
opts: &FrontendOptions,
plugins: Arc<Plugins>,
) -> Result<()> {
let servers = Services::build(opts, Arc::new(self.clone()), plugins).await?;
self.servers = Arc::new(servers);
Ok(())
}
#[cfg(test)]
pub(crate) fn new_distributed(dist_instance: Arc<DistInstance>) -> Self {
Instance {
@@ -199,6 +215,7 @@ impl Instance {
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
}
}
@@ -373,13 +390,24 @@ impl Instance {
pub fn plugins(&self) -> Arc<Plugins> {
self.plugins.clone()
}
pub async fn shutdown(&self) -> Result<()> {
futures::future::try_join_all(self.servers.values().map(|server| server.0.shutdown()))
.await
.context(error::ShutdownServerSnafu)
.map(|_| ())
}
}
#[async_trait]
impl FrontendInstance for Instance {
async fn start(&mut self) -> Result<()> {
// TODO(hl): Frontend init should move to here
Ok(())
futures::future::try_join_all(self.servers.values().map(start_server))
.await
.context(error::StartServerSnafu)
.map(|_| ())
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -30,7 +31,6 @@ use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
use snafu::ResultExt;
use tokio::try_join;
use crate::error::Error::StartServer;
use crate::error::{self, Result};
@@ -41,19 +41,23 @@ use crate::prometheus::PrometheusOptions;
pub(crate) struct Services;
pub type ServerHandlers = HashMap<String, ServerHandler>;
pub type ServerHandler = (Box<dyn Server>, SocketAddr);
impl Services {
pub(crate) async fn start<T>(
pub(crate) async fn build<T>(
opts: &FrontendOptions,
instance: Arc<T>,
plugins: Arc<Plugins>,
) -> Result<()>
) -> Result<ServerHandlers>
where
T: FrontendInstance,
{
info!("Starting frontend servers");
let mut result = Vec::<ServerHandler>::with_capacity(plugins.len());
let user_provider = plugins.get::<UserProviderRef>().cloned();
let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options {
if let Some(opts) = &opts.grpc_options {
let grpc_addr = parse_addr(&opts.addr)?;
let grpc_runtime = Arc::new(
@@ -70,12 +74,10 @@ impl Services {
grpc_runtime,
);
Some((Box::new(grpc_server) as _, grpc_addr))
} else {
None
result.push((Box::new(grpc_server), grpc_addr));
};
let mysql_server_and_addr = if let Some(opts) = &opts.mysql_options {
if let Some(opts) = &opts.mysql_options {
let mysql_addr = parse_addr(&opts.addr)?;
let mysql_io_runtime = Arc::new(
@@ -102,13 +104,10 @@ impl Services {
opts.reject_no_database.unwrap_or(false),
)),
);
result.push((mysql_server, mysql_addr));
}
Some((mysql_server, mysql_addr))
} else {
None
};
let postgres_server_and_addr = if let Some(opts) = &opts.postgres_options {
if let Some(opts) = &opts.postgres_options {
let pg_addr = parse_addr(&opts.addr)?;
let pg_io_runtime = Arc::new(
@@ -126,12 +125,12 @@ impl Services {
user_provider.clone(),
)) as Box<dyn Server>;
Some((pg_server, pg_addr))
} else {
None
};
result.push((pg_server, pg_addr));
}
let opentsdb_server_and_addr = if let Some(opts) = &opts.opentsdb_options {
let mut set_opentsdb_handler = false;
if let Some(opts) = &opts.opentsdb_options {
let addr = parse_addr(&opts.addr)?;
let io_runtime = Arc::new(
@@ -144,12 +143,11 @@ impl Services {
let server = OpentsdbServer::create_server(instance.clone(), io_runtime);
Some((server, addr))
} else {
None
};
result.push((server, addr));
set_opentsdb_handler = true;
}
let http_server_and_addr = if let Some(http_options) = &opts.http_options {
if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;
let mut http_server = HttpServer::new(
@@ -160,7 +158,7 @@ impl Services {
http_server.set_user_provider(user_provider);
}
if opentsdb_server_and_addr.is_some() {
if set_opentsdb_handler {
http_server.set_opentsdb_handler(instance.clone());
}
if matches!(
@@ -178,34 +176,24 @@ impl Services {
}
http_server.set_script_handler(instance.clone());
Some((Box::new(http_server) as _, http_addr))
} else {
None
};
result.push((Box::new(http_server), http_addr));
}
let prom_server_and_addr = if let Some(prom_options) = &opts.prom_options {
if let Some(prom_options) = &opts.prom_options {
let prom_addr = parse_addr(&prom_options.addr)?;
let mut prom_server = PromServer::create_server(instance.clone());
let mut prom_server = PromServer::create_server(instance);
if let Some(user_provider) = user_provider {
prom_server.set_user_provider(user_provider);
}
Some((prom_server as _, prom_addr))
} else {
None
result.push((prom_server, prom_addr));
};
try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr),
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr),
start_server(prom_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())
Ok(result
.into_iter()
.map(|(server, addr)| (server.name().to_string(), (server, addr)))
.collect())
}
}
@@ -213,13 +201,10 @@ fn parse_addr(addr: &str) -> Result<SocketAddr> {
addr.parse().context(error::ParseAddrSnafu { addr })
}
async fn start_server(
server_and_addr: Option<(Box<dyn Server>, SocketAddr)>,
pub async fn start_server(
server_and_addr: &(Box<dyn Server>, SocketAddr),
) -> servers::error::Result<Option<SocketAddr>> {
if let Some((server, addr)) = server_and_addr {
info!("Starting server at {}", addr);
server.start(addr).await.map(Some)
} else {
Ok(None)
}
let (server, addr) = server_and_addr;
info!("Starting {} at {}", server.name(), addr);
server.start(*addr).await.map(Some)
}

View File

@@ -65,6 +65,8 @@ impl GrpcServer {
}
}
pub const GRPC_SERVER: &str = "GRPC_SERVER";
#[async_trait]
impl Server for GrpcServer {
async fn shutdown(&self) -> Result<()> {
@@ -108,4 +110,8 @@ impl Server for GrpcServer {
Ok(addr)
}
fn name(&self) -> &str {
GRPC_SERVER
}
}

View File

@@ -519,6 +519,8 @@ impl HttpServer {
}
}
pub const HTTP_SERVER: &str = "HTTP_SERVER";
#[async_trait]
impl Server for HttpServer {
async fn shutdown(&self) -> Result<()> {
@@ -557,6 +559,10 @@ impl Server for HttpServer {
Ok(listening)
}
fn name(&self) -> &str {
HTTP_SERVER
}
}
/// handle error middleware

View File

@@ -200,6 +200,8 @@ impl MysqlServer {
}
}
pub const MYSQL_SERVER: &str = "MYSQL_SERVER";
#[async_trait]
impl Server for MysqlServer {
async fn shutdown(&self) -> Result<()> {
@@ -214,4 +216,8 @@ impl Server for MysqlServer {
self.base_server.start_with(join_handle).await?;
Ok(addr)
}
fn name(&self) -> &str {
MYSQL_SERVER
}
}

View File

@@ -97,6 +97,8 @@ impl OpentsdbServer {
}
}
pub const OPENTSDB_SERVER: &str = "OPENTSDB_SERVER";
#[async_trait]
impl Server for OpentsdbServer {
async fn shutdown(&self) -> Result<()> {
@@ -117,4 +119,7 @@ impl Server for OpentsdbServer {
self.base_server.start_with(join_handle).await?;
Ok(addr)
}
fn name(&self) -> &str {
OPENTSDB_SERVER
}
}

View File

@@ -97,6 +97,8 @@ impl PostgresServer {
}
}
pub const POSTGRES_SERVER: &str = "POSTGRES_SERVER";
#[async_trait]
impl Server for PostgresServer {
async fn shutdown(&self) -> Result<()> {
@@ -118,4 +120,8 @@ impl Server for PostgresServer {
self.base_server.start_with(join_handle).await?;
Ok(addr)
}
fn name(&self) -> &str {
POSTGRES_SERVER
}
}

View File

@@ -106,6 +106,8 @@ impl PromServer {
}
}
pub const PROM_SERVER: &str = "PROM_SERVER";
#[async_trait]
impl Server for PromServer {
async fn shutdown(&self) -> Result<()> {
@@ -146,6 +148,10 @@ impl Server for PromServer {
Ok(listening)
}
fn name(&self) -> &str {
PROM_SERVER
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]

View File

@@ -29,7 +29,7 @@ use crate::error::{self, Result};
pub(crate) type AbortableStream = Abortable<TcpListenerStream>;
#[async_trait]
pub trait Server: Send {
pub trait Server: Send + Sync {
/// Shutdown the server gracefully.
async fn shutdown(&self) -> Result<()>;
@@ -37,6 +37,8 @@ pub trait Server: Send {
///
/// Caller should ensure `start()` is only invoked once.
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr>;
fn name(&self) -> &str;
}
struct AcceptTask {

View File

@@ -91,6 +91,10 @@ impl Server for MockGrpcServer {
Ok(addr)
}
fn name(&self) -> &str {
"MockGrpcServer"
}
}
fn create_grpc_server(table: MemTable) -> Result<Arc<dyn Server>> {