From 1fad67cf4dcc8ec843cb3ad166cb48787df31ac6 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 3 Nov 2022 11:55:22 +0800 Subject: [PATCH] feat: grpc client support multi peers (#380) * feat: grpc client use channel manager * cr --- Cargo.lock | 4 + src/client/Cargo.toml | 3 + src/client/examples/insert.rs | 2 +- src/client/examples/logical.rs | 2 +- src/client/examples/physical.rs | 2 +- src/client/examples/select.rs | 2 +- src/client/src/admin.rs | 4 - src/client/src/client.rs | 170 ++++++++++++++++++------ src/client/src/database.rs | 4 - src/client/src/error.rs | 12 ++ src/client/src/lib.rs | 1 + src/client/src/load_balance.rs | 52 ++++++++ src/datanode/src/tests/grpc_test.rs | 4 +- src/frontend/Cargo.toml | 1 + src/frontend/src/instance.rs | 43 +++--- src/frontend/src/instance/influxdb.rs | 2 +- src/frontend/src/instance/opentsdb.rs | 2 +- src/frontend/src/instance/prometheus.rs | 4 +- src/frontend/src/tests.rs | 40 +++--- 19 files changed, 254 insertions(+), 100 deletions(-) create mode 100644 src/client/src/load_balance.rs diff --git a/Cargo.lock b/Cargo.lock index 8cb1fd7661..ba78692f7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -836,7 +836,10 @@ dependencies = [ "datafusion", "datanode", "datatypes", + "enum_dispatch", + "parking_lot", "prost 0.9.0", + "rand 0.8.5", "snafu", "substrait 0.1.0", "substrait 0.2.0", @@ -1803,6 +1806,7 @@ dependencies = [ "client", "common-base", "common-error", + "common-grpc", "common-query", "common-recordbatch", "common-runtime", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index d54cddba42..541b6d2dd4 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -17,6 +17,9 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "simd", ] } datatypes = { path = "../datatypes" } +enum_dispatch = "0.3" +parking_lot = "0.12" +rand = "0.8" snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.8" diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs index e3e2544842..2755ec10b7 100644 --- a/src/client/examples/insert.rs +++ b/src/client/examples/insert.rs @@ -12,7 +12,7 @@ fn main() { #[tokio::main] async fn run() { - let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let client = Client::with_urls(vec!["127.0.0.1:3001"]); let db = Database::new("greptime", client); let expr = InsertExpr { diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 6b0f8233cc..44c967c08f 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -18,7 +18,7 @@ fn main() { #[tokio::main] async fn run() { - let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let client = Client::with_urls(vec!["127.0.0.1:3001"]); let create_table_expr = CreateExpr { catalog_name: Some("greptime".to_string()), diff --git a/src/client/examples/physical.rs b/src/client/examples/physical.rs index 44b1e22f27..1e866fd80f 100644 --- a/src/client/examples/physical.rs +++ b/src/client/examples/physical.rs @@ -16,7 +16,7 @@ fn main() { #[tokio::main] async fn run() { - let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let client = Client::with_urls(vec!["127.0.0.1:3001"]); let db = Database::new("greptime", client); let physical = mock_physical_plan(); diff --git a/src/client/examples/select.rs b/src/client/examples/select.rs index 442360bf37..160c217fc5 100644 --- a/src/client/examples/select.rs +++ b/src/client/examples/select.rs @@ -10,7 +10,7 @@ fn main() { #[tokio::main] async fn run() { - let client = Client::connect("http://127.0.0.1:3001").await.unwrap(); + let client = Client::with_urls(vec!["127.0.0.1:3001"]); let db = Database::new("greptime", client); let sql = Select::Sql("select * from demo".to_string()); diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs index 717bea7b19..30be0bc9a5 100644 --- a/src/client/src/admin.rs +++ b/src/client/src/admin.rs @@ -22,10 +22,6 @@ impl Admin { } } - pub async fn start(&mut self, url: impl Into) -> Result<()> { - self.client.start(url).await - } - pub async fn create(&self, expr: CreateExpr) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, diff --git a/src/client/src/client.rs b/src/client/src/client.rs index d9a55b92f8..05bfb4c0c3 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -1,47 +1,96 @@ -use api::v1::{greptime_client::GreptimeClient, *}; -use snafu::{OptionExt, ResultExt}; +use std::sync::Arc; + +use api::v1::greptime_client::GreptimeClient; +use api::v1::*; +use common_grpc::channel_manager::ChannelManager; +use parking_lot::RwLock; +use snafu::OptionExt; +use snafu::ResultExt; use tonic::transport::Channel; use crate::error; +use crate::load_balance::LoadBalance; +use crate::load_balance::Loadbalancer; use crate::Result; #[derive(Clone, Debug, Default)] pub struct Client { - client: Option>, + inner: Arc, +} + +#[derive(Debug, Default)] +struct Inner { + channel_manager: ChannelManager, + peers: Arc>>, + load_balance: Loadbalancer, +} + +impl Inner { + fn with_manager(channel_manager: ChannelManager) -> Self { + Self { + channel_manager, + ..Default::default() + } + } + + fn set_peers(&self, peers: Vec) { + let mut guard = self.peers.write(); + *guard = peers; + } + + fn get_peer(&self) -> Option { + let guard = self.peers.read(); + self.load_balance.get_peer(&guard).cloned() + } } impl Client { - pub async fn start(&mut self, url: impl Into) -> Result<()> { - match self.client.as_ref() { - None => { - let url = url.into(); - let client = GreptimeClient::connect(url.clone()) - .await - .context(error::ConnectFailedSnafu { url })?; - self.client = Some(client); - Ok(()) - } - Some(_) => error::IllegalGrpcClientStateSnafu { - err_msg: "already started", - } - .fail(), - } + pub fn new() -> Self { + Default::default() } - pub fn with_client(client: GreptimeClient) -> Self { + pub fn with_manager(channel_manager: ChannelManager) -> Self { + let inner = Arc::new(Inner::with_manager(channel_manager)); + Self { inner } + } + + pub fn with_urls(urls: A) -> Self + where + U: AsRef, + A: AsRef<[U]>, + { + Self::with_manager_and_urls(ChannelManager::new(), urls) + } + + pub fn with_manager_and_urls(channel_manager: ChannelManager, urls: A) -> Self + where + U: AsRef, + A: AsRef<[U]>, + { + let inner = Inner::with_manager(channel_manager); + let urls: Vec = urls + .as_ref() + .iter() + .map(|peer| peer.as_ref().to_string()) + .collect(); + inner.set_peers(urls); Self { - client: Some(client), + inner: Arc::new(inner), } } - pub async fn connect(url: impl Into) -> Result { - let url = url.into(); - let client = GreptimeClient::connect(url.clone()) - .await - .context(error::ConnectFailedSnafu { url })?; - Ok(Self { - client: Some(client), - }) + pub fn start(&self, urls: A) + where + U: AsRef, + A: AsRef<[U]>, + { + let urls: Vec = urls + .as_ref() + .iter() + .map(|peer| peer.as_ref().to_string()) + .collect(); + + self.inner.set_peers(urls); } pub async fn admin(&self, req: AdminRequest) -> Result { @@ -73,18 +122,59 @@ impl Client { } pub async fn batch(&self, req: BatchRequest) -> Result { - if let Some(client) = self.client.as_ref() { - let res = client - .clone() - .batch(req) - .await - .context(error::TonicStatusSnafu)?; - Ok(res.into_inner()) - } else { - error::IllegalGrpcClientStateSnafu { - err_msg: "not started", - } - .fail() + let peer = self + .inner + .get_peer() + .context(error::IllegalGrpcClientStateSnafu { + err_msg: "No available peer found", + })?; + let mut client = self.make_client(peer)?; + let result = client.batch(req).await.context(error::TonicStatusSnafu)?; + Ok(result.into_inner()) + } + + fn make_client(&self, addr: impl AsRef) -> Result> { + let addr = addr.as_ref(); + let channel = self + .inner + .channel_manager + .get(addr) + .context(error::CreateChannelSnafu { addr })?; + Ok(GreptimeClient::new(channel)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::Inner; + use crate::load_balance::Loadbalancer; + + fn mock_peers() -> Vec { + vec![ + "127.0.0.1:3001".to_string(), + "127.0.0.1:3002".to_string(), + "127.0.0.1:3003".to_string(), + ] + } + + #[test] + fn test_inner() { + let inner = Inner::default(); + + assert!(matches!( + inner.load_balance, + Loadbalancer::Random(crate::load_balance::Random) + )); + assert!(inner.get_peer().is_none()); + + let peers = mock_peers(); + inner.set_peers(peers.clone()); + let all: HashSet = peers.into_iter().collect(); + + for _ in 0..20 { + assert!(all.contains(&inner.get_peer().unwrap())); } } } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 2a1d8dc76e..bff78ebda5 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -43,10 +43,6 @@ impl Database { } } - pub async fn start(&mut self, url: impl Into) -> Result<()> { - self.client.start(url).await - } - pub fn name(&self) -> &str { &self.name } diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 7169150ae4..c8c4517667 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -85,6 +85,17 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display( + "Failed to create gRPC channel, peer address: {}, source: {}", + addr, + source + ))] + CreateChannel { + addr: String, + #[snafu(backtrace)] + source: common_grpc::error::Error, + }, } pub type Result = std::result::Result; @@ -107,6 +118,7 @@ impl ErrorExt for Error { source.status_code() } Error::CreateRecordBatches { source } => source.status_code(), + Error::CreateChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, } } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index b39ea34e20..5eea78394b 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -2,6 +2,7 @@ pub mod admin; mod client; mod database; mod error; +pub mod load_balance; pub use self::{ client::Client, diff --git a/src/client/src/load_balance.rs b/src/client/src/load_balance.rs new file mode 100644 index 0000000000..60e37a4e1b --- /dev/null +++ b/src/client/src/load_balance.rs @@ -0,0 +1,52 @@ +use enum_dispatch::enum_dispatch; +use rand::seq::SliceRandom; + +#[enum_dispatch] +pub trait LoadBalance { + fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String>; +} + +#[enum_dispatch(LoadBalance)] +#[derive(Debug)] +pub enum Loadbalancer { + Random, +} + +impl Default for Loadbalancer { + fn default() -> Self { + Loadbalancer::from(Random) + } +} + +#[derive(Debug)] +pub struct Random; + +impl LoadBalance for Random { + fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String> { + peers.choose(&mut rand::thread_rng()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::{LoadBalance, Random}; + + #[test] + fn test_random_lb() { + let peers = vec![ + "127.0.0.1:3001".to_string(), + "127.0.0.1:3002".to_string(), + "127.0.0.1:3003".to_string(), + "127.0.0.1:3004".to_string(), + ]; + let all: HashSet = peers.clone().into_iter().collect(); + + let random = Random; + for _ in 0..100 { + let peer = random.get_peer(&peers).unwrap(); + all.contains(peer); + } + } +} diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index b0455b8ce7..f5a2aa79a6 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -45,7 +45,7 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc (Column, Column, Column, Column) { async fn test_insert_and_select() { let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await; - let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap(); + let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client.clone()); let admin = Admin::new("greptime", grpc_client); diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index af83b960c6..a1ace3bde9 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1" client = { path = "../client" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 29ce0f83db..5d72af5099 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -30,40 +30,37 @@ use crate::frontend::FrontendOptions; pub(crate) type InstanceRef = Arc; +#[derive(Default)] pub struct Instance { - db: Database, - admin: Admin, + client: Client, } impl Instance { pub(crate) fn new() -> Self { - let client = Client::default(); - let db = Database::new("greptime", client.clone()); - let admin = Admin::new("greptime", client); - Self { db, admin } + Default::default() } pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> { let addr = opts.datanode_grpc_addr(); - self.db - .start(addr.clone()) - .await - .context(error::ConnectDatanodeSnafu { addr: addr.clone() })?; - self.admin - .start(addr.clone()) - .await - .context(error::ConnectDatanodeSnafu { addr })?; + self.client.start(vec![addr]); Ok(()) } + + // TODO(fys): temporarily hard code + pub fn database(&self) -> Database { + Database::new("greptime", self.client.clone()) + } + + // TODO(fys): temporarily hard code + pub fn admin(&self) -> Admin { + Admin::new("greptime", self.client.clone()) + } } #[cfg(test)] impl Instance { pub fn with_client(client: Client) -> Self { - Self { - db: Database::new("greptime", client.clone()), - admin: Admin::new("greptime", client), - } + Self { client } } } @@ -85,7 +82,7 @@ impl SqlQueryHandler for Instance { match stmt { Statement::Query(_) => self - .db + .database() .select(Select::Sql(query.to_string())) .await .and_then(|object_result| object_result.try_into()), @@ -96,7 +93,7 @@ impl SqlQueryHandler for Instance { expr: Some(insert_expr::Expr::Sql(query.to_string())), options: HashMap::default(), }; - self.db + self.database() .insert(expr) .await .and_then(|object_result| object_result.try_into()) @@ -105,7 +102,7 @@ impl SqlQueryHandler for Instance { let expr = create_to_expr(create) .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query })?; - self.admin + self.admin() .create(expr) .await .and_then(admin_result_to_output) @@ -235,7 +232,7 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result> { #[async_trait] impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> server_error::Result { - self.db + self.database() .object(query.clone()) .await .map_err(BoxedError::new) @@ -248,7 +245,7 @@ impl GrpcQueryHandler for Instance { #[async_trait] impl GrpcAdminHandler for Instance { async fn exec_admin_request(&self, expr: AdminExpr) -> server_error::Result { - self.admin + self.admin() .do_request(expr.clone()) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 74ad15f98c..09e26bfa24 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -11,7 +11,7 @@ use crate::instance::Instance; impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { let exprs: Vec = request.try_into()?; - self.db + self.database() .batch_insert(exprs) .await .map_err(BoxedError::new) diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index f7ac68993f..d2b6b13502 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -28,7 +28,7 @@ impl Instance { async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { let expr = data_point.as_grpc_insert(); - let result = self.db.insert(expr.clone()).await; + let result = self.database().insert(expr.clone()).await; let object_result = match result { Ok(result) => result, diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index bc87ed3ab8..dc466aaa1c 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -92,7 +92,7 @@ impl PrometheusProtocolHandler for Instance { async fn write(&self, request: WriteRequest) -> ServerResult<()> { let exprs = prometheus::write_request_to_insert_exprs(request)?; - self.db + self.database() .batch_insert(exprs) .await .map_err(BoxedError::new) @@ -107,7 +107,7 @@ impl PrometheusProtocolHandler for Instance { let response_type = negotiate_response_type(&request.accepted_response_types)?; // TODO(dennis): use read_hints to speedup query if possible - let results = handle_remote_queries(&self.db, &request.queries).await?; + let results = handle_remote_queries(&self.database(), &request.queries).await?; match response_type { ResponseType::Samples => { diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index bc2b473e97..a1317d7bbf 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use api::v1::greptime_client::GreptimeClient; use client::Client; +use common_grpc::channel_manager::ChannelManager; use datanode::instance::Instance as DatanodeInstance; use servers::grpc::GrpcServer; -use tonic::transport::{Endpoint, Server}; +use tonic::transport::Server; use tower::service_fn; use crate::instance::Instance; @@ -37,25 +37,27 @@ pub(crate) async fn create_frontend_instance() -> Arc { // on the first attempt to connect. All other attempts will fail. let mut client = Some(client); // "http://[::]:50051" is just a placeholder, does not actually connect to it, - // see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934 - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(service_fn(move |_| { - let client = client.take(); + let addr = "[::].50051"; + let channel_manager = ChannelManager::new(); + channel_manager + .reset_with_connector( + addr, + service_fn(move |_| { + let client = client.take(); - async move { - if let Some(client) = client { - Ok(client) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "Client already taken", - )) + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Client already taken", + )) + } } - } - })) - .await + }), + ) .unwrap(); - let client = Client::with_client(GreptimeClient::new(channel)); + let client = Client::with_manager_and_urls(channel_manager, vec![addr]); Arc::new(Instance::with_client(client)) }