fix: detach grpc tasks to another runtime (#376)

* fix: detach grpc tasks to another runtime

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add runtime size options

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* group an obj-req into one task

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* make nitpicking CRer happy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-11-03 17:24:15 +08:00
committed by GitHub
parent 77233c20e1
commit 9fd2d4e8db
12 changed files with 115 additions and 20 deletions

View File

@@ -1,6 +1,7 @@
http_addr = '0.0.0.0:3000'
rpc_addr = '0.0.0.0:3001'
wal_dir = '/tmp/greptimedb/wal'
rpc_runtime_size = 8
mysql_addr = '0.0.0.0:3306'
mysql_runtime_size = 4

View File

@@ -1,5 +1,6 @@
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
@@ -74,7 +75,10 @@ impl TryFrom<StartCommand> for FrontendOptions {
opts.http_addr = Some(addr);
}
if let Some(addr) = cmd.grpc_addr {
opts.grpc_addr = Some(addr);
opts.grpc_options = Some(GrpcOptions {
addr,
..Default::default()
});
}
if let Some(addr) = cmd.mysql_addr {
opts.mysql_options = Some(MysqlOptions {
@@ -130,7 +134,10 @@ mod tests {
);
let default_opts = FrontendOptions::default();
assert_eq!(opts.grpc_addr, default_opts.grpc_addr);
assert_eq!(
opts.grpc_options.unwrap().addr,
default_opts.grpc_options.unwrap().addr
);
assert_eq!(
opts.mysql_options.as_ref().unwrap().runtime_size,
default_opts.mysql_options.as_ref().unwrap().runtime_size

View File

@@ -24,10 +24,11 @@ impl Default for ObjectStoreConfig {
pub struct DatanodeOptions {
pub http_addr: String,
pub rpc_addr: String,
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: u32,
pub mysql_runtime_size: usize,
pub postgres_addr: String,
pub postgres_runtime_size: u32,
pub postgres_runtime_size: usize,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
}
@@ -37,6 +38,7 @@ impl Default for DatanodeOptions {
Self {
http_addr: "0.0.0.0:3000".to_string(),
rpc_addr: "0.0.0.0:3001".to_string(),
rpc_runtime_size: 8,
mysql_addr: "0.0.0.0:3306".to_string(),
mysql_runtime_size: 2,
postgres_addr: "0.0.0.0:5432".to_string(),

View File

@@ -40,9 +40,16 @@ impl Services {
.build()
.context(error::RuntimeResourceSnafu)?,
);
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size as usize)
.thread_name("grpc-io-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
Ok(Self {
http_server: HttpServer::new(instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone()),
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
mysql_server: MysqlServer::create_server(instance.clone(), mysql_io_runtime),
postgres_server: Box::new(PostgresServer::new(instance, postgres_io_runtime)),
})

View File

@@ -11,6 +11,7 @@ use api::v1::{
};
use client::admin::Admin;
use client::{Client, Database, ObjectResult};
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::server::Server;
@@ -27,9 +28,17 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
instance.start().await.unwrap();
let addr_cloned = addr.clone();
let grpc_server = Arc::new(GrpcServer::new(instance.clone(), instance));
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);
let grpc_server = Arc::new(GrpcServer::new(instance.clone(), instance, runtime));
let grpc_server_clone = grpc_server.clone();
tokio::spawn(async move {
let addr = addr_cloned.parse::<SocketAddr>().unwrap();
grpc_server_clone.start(addr).await.unwrap()

View File

@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use crate::error::{self, Result};
use crate::grpc::GrpcOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::Instance;
use crate::mysql::MysqlOptions;
@@ -15,7 +16,7 @@ use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FrontendOptions {
pub http_addr: Option<String>,
pub grpc_addr: Option<String>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
@@ -27,7 +28,7 @@ impl Default for FrontendOptions {
fn default() -> Self {
Self {
http_addr: Some("0.0.0.0:4000".to_string()),
grpc_addr: Some("0.0.0.0:4001".to_string()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),

16
src/frontend/src/grpc.rs Normal file
View File

@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GrpcOptions {
pub addr: String,
pub runtime_size: usize,
}
impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "0.0.0.0:4001".to_string(),
runtime_size: 8,
}
}
}

View File

@@ -2,6 +2,7 @@
pub mod error;
pub mod frontend;
pub mod grpc;
pub mod influxdb;
pub mod instance;
pub mod mysql;

View File

@@ -21,10 +21,18 @@ pub(crate) struct Services;
impl Services {
pub(crate) async fn start(opts: &FrontendOptions, instance: InstanceRef) -> Result<()> {
let grpc_server_and_addr = if let Some(grpc_addr) = &opts.grpc_addr {
let grpc_addr = parse_addr(grpc_addr)?;
let grpc_server_and_addr = if let Some(opts) = &opts.grpc_options {
let grpc_addr = parse_addr(&opts.addr)?;
let grpc_server = GrpcServer::new(instance.clone(), instance.clone());
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.runtime_size)
.thread_name("grpc-handlers")
.build()
.context(error::RuntimeResourceSnafu)?,
);
let grpc_server = GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime);
Some((Box::new(grpc_server) as _, grpc_addr))
} else {

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use datanode::instance::Instance as DatanodeInstance;
use servers::grpc::GrpcServer;
use tonic::transport::Server;
@@ -22,10 +23,18 @@ pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
let (client, server) = tokio::io::duplex(1024);
let runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(2)
.thread_name("grpc-handlers")
.build()
.unwrap(),
);
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance).create_service();
GrpcServer::new(datanode_instance.clone(), datanode_instance, runtime).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)

View File

@@ -1,9 +1,11 @@
pub mod handler;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::{greptime_server, BatchRequest, BatchResponse};
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::info;
use futures::FutureExt;
use snafu::ensure;
@@ -23,20 +25,30 @@ pub struct GrpcServer {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
runtime: Arc<Runtime>,
}
impl GrpcServer {
pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
Self {
query_handler,
admin_handler,
shutdown_tx: Mutex::new(None),
runtime,
}
}
pub fn create_service(&self) -> greptime_server::GreptimeServer<GrpcService> {
let service = GrpcService {
handler: BatchHandler::new(self.query_handler.clone(), self.admin_handler.clone()),
handler: BatchHandler::new(
self.query_handler.clone(),
self.admin_handler.clone(),
self.runtime.clone(),
),
};
greptime_server::GreptimeServer::new(service)
}

View File

@@ -1,4 +1,8 @@
use std::sync::Arc;
use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse};
use common_runtime::Runtime;
use tokio::sync::oneshot;
use crate::error::Result;
use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
@@ -7,13 +11,19 @@ use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
pub struct BatchHandler {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
}
impl BatchHandler {
pub fn new(query_handler: GrpcQueryHandlerRef, admin_handler: GrpcAdminHandlerRef) -> Self {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
Self {
query_handler,
admin_handler,
runtime,
}
}
@@ -30,12 +40,24 @@ impl BatchHandler {
}
batch_resp.admins.push(admin_resp);
for db_req in batch_req.databases {
for obj_expr in db_req.exprs {
let object_resp = self.query_handler.do_query(obj_expr).await?;
db_resp.results.push(object_resp);
let (tx, rx) = oneshot::channel();
let query_handler = self.query_handler.clone();
let _ = self.runtime.spawn(async move {
// execute request in another runtime to prevent the execution from being cancelled unexpected by tonic runtime.
let mut result = vec![];
for db_req in batch_req.databases {
for obj_expr in db_req.exprs {
let object_resp = query_handler.do_query(obj_expr).await;
result.push(object_resp);
}
}
}
// Ignore send result. Usually an error indicates the rx is dropped (request timeouted).
let _ = tx.send(result);
});
// Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap
// is used to poison the upper layer.
db_resp.results = rx.await.unwrap().into_iter().collect::<Result<_>>()?;
batch_resp.databases.push(db_resp);
Ok(batch_resp)
}