From 9fd2d4e8db5a00d908780872c4879e877ed7370e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 3 Nov 2022 17:24:15 +0800 Subject: [PATCH] fix: detach grpc tasks to another runtime (#376) * fix: detach grpc tasks to another runtime Signed-off-by: Ruihang Xia * add runtime size options Signed-off-by: Ruihang Xia * group an obj-req into one task Signed-off-by: Ruihang Xia * make nitpicking CRer happy Signed-off-by: Ruihang Xia Signed-off-by: Ruihang Xia --- config/datanode.example.toml | 1 + src/cmd/src/frontend.rs | 11 ++++++++-- src/datanode/src/datanode.rs | 6 +++-- src/datanode/src/server.rs | 9 +++++++- src/datanode/src/tests/grpc_test.rs | 11 +++++++++- src/frontend/src/frontend.rs | 5 +++-- src/frontend/src/grpc.rs | 16 ++++++++++++++ src/frontend/src/lib.rs | 1 + src/frontend/src/server.rs | 14 +++++++++--- src/frontend/src/tests.rs | 11 +++++++++- src/servers/src/grpc.rs | 16 ++++++++++++-- src/servers/src/grpc/handler.rs | 34 ++++++++++++++++++++++++----- 12 files changed, 115 insertions(+), 20 deletions(-) create mode 100644 src/frontend/src/grpc.rs diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0381ea3a07..866e806664 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,6 +1,7 @@ http_addr = '0.0.0.0:3000' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/greptimedb/wal' +rpc_runtime_size = 8 mysql_addr = '0.0.0.0:3306' mysql_runtime_size = 4 diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 33050b45c1..f8c6af218a 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -1,5 +1,6 @@ use clap::Parser; use frontend::frontend::{Frontend, FrontendOptions}; +use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; @@ -74,7 +75,10 @@ impl TryFrom for FrontendOptions { opts.http_addr = Some(addr); } if let Some(addr) = cmd.grpc_addr { - opts.grpc_addr = Some(addr); + opts.grpc_options = Some(GrpcOptions { + addr, + ..Default::default() + }); } if let Some(addr) = cmd.mysql_addr { opts.mysql_options = Some(MysqlOptions { @@ -130,7 +134,10 @@ mod tests { ); let default_opts = FrontendOptions::default(); - assert_eq!(opts.grpc_addr, default_opts.grpc_addr); + assert_eq!( + opts.grpc_options.unwrap().addr, + default_opts.grpc_options.unwrap().addr + ); assert_eq!( opts.mysql_options.as_ref().unwrap().runtime_size, default_opts.mysql_options.as_ref().unwrap().runtime_size diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b7785d8d30..329db00ae2 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -24,10 +24,11 @@ impl Default for ObjectStoreConfig { pub struct DatanodeOptions { pub http_addr: String, pub rpc_addr: String, + pub rpc_runtime_size: usize, pub mysql_addr: String, - pub mysql_runtime_size: u32, + pub mysql_runtime_size: usize, pub postgres_addr: String, - pub postgres_runtime_size: u32, + pub postgres_runtime_size: usize, pub wal_dir: String, pub storage: ObjectStoreConfig, } @@ -37,6 +38,7 @@ impl Default for DatanodeOptions { Self { http_addr: "0.0.0.0:3000".to_string(), rpc_addr: "0.0.0.0:3001".to_string(), + rpc_runtime_size: 8, mysql_addr: "0.0.0.0:3306".to_string(), mysql_runtime_size: 2, postgres_addr: "0.0.0.0:5432".to_string(), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 7a9ac13b6c..9e17a6ebb8 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -40,9 +40,16 @@ impl Services { .build() .context(error::RuntimeResourceSnafu)?, ); + let grpc_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.rpc_runtime_size as usize) + .thread_name("grpc-io-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); Ok(Self { http_server: HttpServer::new(instance.clone()), - grpc_server: GrpcServer::new(instance.clone(), instance.clone()), + grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime), mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime), postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)), }) diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index f5a2aa79a6..74e25658ba 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -11,6 +11,7 @@ use api::v1::{ }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; +use common_runtime::Builder as RuntimeBuilder; use servers::grpc::GrpcServer; use servers::server::Server; @@ -27,9 +28,17 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc().unwrap(); grpc_server_clone.start(addr).await.unwrap() diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index bde429336e..47fe7bf405 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use snafu::prelude::*; use crate::error::{self, Result}; +use crate::grpc::GrpcOptions; use crate::influxdb::InfluxdbOptions; use crate::instance::Instance; use crate::mysql::MysqlOptions; @@ -15,7 +16,7 @@ use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct FrontendOptions { pub http_addr: Option, - pub grpc_addr: Option, + pub grpc_options: Option, pub mysql_options: Option, pub postgres_options: Option, pub opentsdb_options: Option, @@ -27,7 +28,7 @@ impl Default for FrontendOptions { fn default() -> Self { Self { http_addr: Some("0.0.0.0:4000".to_string()), - grpc_addr: Some("0.0.0.0:4001".to_string()), + grpc_options: Some(GrpcOptions::default()), mysql_options: Some(MysqlOptions::default()), postgres_options: Some(PostgresOptions::default()), opentsdb_options: Some(OpentsdbOptions::default()), diff --git a/src/frontend/src/grpc.rs b/src/frontend/src/grpc.rs new file mode 100644 index 0000000000..c85244d4f7 --- /dev/null +++ b/src/frontend/src/grpc.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GrpcOptions { + pub addr: String, + pub runtime_size: usize, +} + +impl Default for GrpcOptions { + fn default() -> Self { + Self { + addr: "0.0.0.0:4001".to_string(), + runtime_size: 8, + } + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 85521c4475..63bae536a8 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod frontend; +pub mod grpc; pub mod influxdb; pub mod instance; pub mod mysql; diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 2ee90c2dd0..882c671bb4 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -21,10 +21,18 @@ pub(crate) struct Services; impl Services { pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> { - let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr { - let grpc_addr = parse_addr(grpc_addr)?; + let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options { + let grpc_addr = parse_addr(&opts.addr)?; - let grpc_server = GrpcServer::new(instance.clone(), instance.clone()); + let grpc_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.runtime_size) + .thread_name("grpc-handlers") + .build() + .context(error::RuntimeResourceSnafu)?, + ); + + let grpc_server = GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime); Some((Box::new(grpc_server) as _, grpc_addr)) } else { diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index a1317d7bbf..11763baf91 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use client::Client; use common_grpc::channel_manager::ChannelManager; +use common_runtime::Builder as RuntimeBuilder; use datanode::instance::Instance as DatanodeInstance; use servers::grpc::GrpcServer; use tonic::transport::Server; @@ -22,10 +23,18 @@ pub(crate) async fn create_frontend_instance() -> Arc { let (client, server) = tokio::io::duplex(1024); + let runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(2) + .thread_name("grpc-handlers") + .build() + .unwrap(), + ); + // create a mock datanode grpc service, see example here: // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs let datanode_service = - GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service(); + GrpcServer::new(datanode_instance.clone(), datanode_instance, runtime).create_service(); tokio::spawn(async move { Server::builder() .add_service(datanode_service) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index e000c580f5..ddddeab180 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -1,9 +1,11 @@ pub mod handler; use std::net::SocketAddr; +use std::sync::Arc; use api::v1::{greptime_server, BatchRequest, BatchResponse}; use async_trait::async_trait; +use common_runtime::Runtime; use common_telemetry::logging::info; use futures::FutureExt; use snafu::ensure; @@ -23,20 +25,30 @@ pub struct GrpcServer { query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef, shutdown_tx: Mutex>>, + runtime: Arc, } impl GrpcServer { - pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self { + pub fn new( + query_handler: GrpcQueryHandlerRef, + admin_handler: GrpcAdminHandlerRef, + runtime: Arc, + ) -> Self { Self { query_handler, admin_handler, shutdown_tx: Mutex::new(None), + runtime, } } pub fn create_service(&self) -> greptime_server::GreptimeServer { let service = GrpcService { - handler: BatchHandler::new(self.query_handler.clone(), self.admin_handler.clone()), + handler: BatchHandler::new( + self.query_handler.clone(), + self.admin_handler.clone(), + self.runtime.clone(), + ), }; greptime_server::GreptimeServer::new(service) } diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index 264e37bd87..c37adaaaa5 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -1,4 +1,8 @@ +use std::sync::Arc; + use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse}; +use common_runtime::Runtime; +use tokio::sync::oneshot; use crate::error::Result; use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; @@ -7,13 +11,19 @@ use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; pub struct BatchHandler { query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef, + runtime: Arc, } impl BatchHandler { - pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self { + pub fn new( + query_handler: GrpcQueryHandlerRef, + admin_handler: GrpcAdminHandlerRef, + runtime: Arc, + ) -> Self { Self { query_handler, admin_handler, + runtime, } } @@ -30,12 +40,24 @@ impl BatchHandler { } batch_resp.admins.push(admin_resp); - for db_req in batch_req.databases { - for obj_expr in db_req.exprs { - let object_resp = self.query_handler.do_query(obj_expr).await?; - db_resp.results.push(object_resp); + let (tx, rx) = oneshot::channel(); + let query_handler = self.query_handler.clone(); + let _ = self.runtime.spawn(async move { + // execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime. + let mut result = vec![]; + for db_req in batch_req.databases { + for obj_expr in db_req.exprs { + let object_resp = query_handler.do_query(obj_expr).await; + + result.push(object_resp); + } } - } + // Ignore send result. Usually an error indicates the rx is dropped (request timeouted). + let _ = tx.send(result); + }); + // Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap + // is used to poison the upper layer. + db_resp.results = rx.await.unwrap().into_iter().collect::>()?; batch_resp.databases.push(db_resp); Ok(batch_resp) }