feat: grpc client support multi peers (#380)

* feat: grpc client use channel manager

* cr
This commit is contained in:
fys
2022-11-03 11:55:22 +08:00
committed by GitHub
parent 5abff7a536
commit 1fad67cf4d
19 changed files with 254 additions and 100 deletions

4
Cargo.lock generated
View File

@@ -836,7 +836,10 @@ dependencies = [
"datafusion",
"datanode",
"datatypes",
"enum_dispatch",
"parking_lot",
"prost 0.9.0",
"rand 0.8.5",
"snafu",
"substrait 0.1.0",
"substrait 0.2.0",
@@ -1803,6 +1806,7 @@ dependencies = [
"client",
"common-base",
"common-error",
"common-grpc",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -17,6 +17,9 @@ datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch =
"simd",
] }
datatypes = { path = "../datatypes" }
enum_dispatch = "0.3"
parking_lot = "0.12"
rand = "0.8"
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.8"

View File

@@ -12,7 +12,7 @@ fn main() {
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let expr = InsertExpr {

View File

@@ -18,7 +18,7 @@ fn main() {
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let create_table_expr = CreateExpr {
catalog_name: Some("greptime".to_string()),

View File

@@ -16,7 +16,7 @@ fn main() {
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let physical = mock_physical_plan();

View File

@@ -10,7 +10,7 @@ fn main() {
#[tokio::main]
async fn run() {
let client = Client::connect("http://127.0.0.1:3001").await.unwrap();
let client = Client::with_urls(vec!["127.0.0.1:3001"]);
let db = Database::new("greptime", client);
let sql = Select::Sql("select * from demo".to_string());

View File

@@ -22,10 +22,6 @@ impl Admin {
}
}
pub async fn start(&mut self, url: impl Into<String>) -> Result<()> {
self.client.start(url).await
}
pub async fn create(&self, expr: CreateExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,

View File

@@ -1,47 +1,96 @@
use api::v1::{greptime_client::GreptimeClient, *};
use snafu::{OptionExt, ResultExt};
use std::sync::Arc;
use api::v1::greptime_client::GreptimeClient;
use api::v1::*;
use common_grpc::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::OptionExt;
use snafu::ResultExt;
use tonic::transport::Channel;
use crate::error;
use crate::load_balance::LoadBalance;
use crate::load_balance::Loadbalancer;
use crate::Result;
#[derive(Clone, Debug, Default)]
pub struct Client {
client: Option<GreptimeClient<Channel>>,
inner: Arc<Inner>,
}
#[derive(Debug, Default)]
struct Inner {
channel_manager: ChannelManager,
peers: Arc<RwLock<Vec<String>>>,
load_balance: Loadbalancer,
}
impl Inner {
fn with_manager(channel_manager: ChannelManager) -> Self {
Self {
channel_manager,
..Default::default()
}
}
fn set_peers(&self, peers: Vec<String>) {
let mut guard = self.peers.write();
*guard = peers;
}
fn get_peer(&self) -> Option<String> {
let guard = self.peers.read();
self.load_balance.get_peer(&guard).cloned()
}
}
impl Client {
pub async fn start(&mut self, url: impl Into<String>) -> Result<()> {
match self.client.as_ref() {
None => {
let url = url.into();
let client = GreptimeClient::connect(url.clone())
.await
.context(error::ConnectFailedSnafu { url })?;
self.client = Some(client);
Ok(())
}
Some(_) => error::IllegalGrpcClientStateSnafu {
err_msg: "already started",
}
.fail(),
}
pub fn new() -> Self {
Default::default()
}
pub fn with_client(client: GreptimeClient<Channel>) -> Self {
pub fn with_manager(channel_manager: ChannelManager) -> Self {
let inner = Arc::new(Inner::with_manager(channel_manager));
Self { inner }
}
pub fn with_urls<U, A>(urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
Self::with_manager_and_urls(ChannelManager::new(), urls)
}
pub fn with_manager_and_urls<U, A>(channel_manager: ChannelManager, urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let inner = Inner::with_manager(channel_manager);
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
inner.set_peers(urls);
Self {
client: Some(client),
inner: Arc::new(inner),
}
}
pub async fn connect(url: impl Into<String>) -> Result<Self> {
let url = url.into();
let client = GreptimeClient::connect(url.clone())
.await
.context(error::ConnectFailedSnafu { url })?;
Ok(Self {
client: Some(client),
})
pub fn start<U, A>(&self, urls: A)
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
self.inner.set_peers(urls);
}
pub async fn admin(&self, req: AdminRequest) -> Result<AdminResponse> {
@@ -73,18 +122,59 @@ impl Client {
}
pub async fn batch(&self, req: BatchRequest) -> Result<BatchResponse> {
if let Some(client) = self.client.as_ref() {
let res = client
.clone()
.batch(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
} else {
error::IllegalGrpcClientStateSnafu {
err_msg: "not started",
}
.fail()
let peer = self
.inner
.get_peer()
.context(error::IllegalGrpcClientStateSnafu {
err_msg: "No available peer found",
})?;
let mut client = self.make_client(peer)?;
let result = client.batch(req).await.context(error::TonicStatusSnafu)?;
Ok(result.into_inner())
}
fn make_client(&self, addr: impl AsRef<str>) -> Result<GreptimeClient<Channel>> {
let addr = addr.as_ref();
let channel = self
.inner
.channel_manager
.get(addr)
.context(error::CreateChannelSnafu { addr })?;
Ok(GreptimeClient::new(channel))
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::Inner;
use crate::load_balance::Loadbalancer;
fn mock_peers() -> Vec<String> {
vec![
"127.0.0.1:3001".to_string(),
"127.0.0.1:3002".to_string(),
"127.0.0.1:3003".to_string(),
]
}
#[test]
fn test_inner() {
let inner = Inner::default();
assert!(matches!(
inner.load_balance,
Loadbalancer::Random(crate::load_balance::Random)
));
assert!(inner.get_peer().is_none());
let peers = mock_peers();
inner.set_peers(peers.clone());
let all: HashSet<String> = peers.into_iter().collect();
for _ in 0..20 {
assert!(all.contains(&inner.get_peer().unwrap()));
}
}
}

View File

@@ -43,10 +43,6 @@ impl Database {
}
}
pub async fn start(&mut self, url: impl Into<String>) -> Result<()> {
self.client.start(url).await
}
pub fn name(&self) -> &str {
&self.name
}

View File

@@ -85,6 +85,17 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display(
"Failed to create gRPC channel, peer address: {}, source: {}",
addr,
source
))]
CreateChannel {
addr: String,
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -107,6 +118,7 @@ impl ErrorExt for Error {
source.status_code()
}
Error::CreateRecordBatches { source } => source.status_code(),
Error::CreateChannel { source, .. } => source.status_code(),
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
}
}

View File

@@ -2,6 +2,7 @@ pub mod admin;
mod client;
mod database;
mod error;
pub mod load_balance;
pub use self::{
client::Client,

View File

@@ -0,0 +1,52 @@
use enum_dispatch::enum_dispatch;
use rand::seq::SliceRandom;
#[enum_dispatch]
pub trait LoadBalance {
fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String>;
}
#[enum_dispatch(LoadBalance)]
#[derive(Debug)]
pub enum Loadbalancer {
Random,
}
impl Default for Loadbalancer {
fn default() -> Self {
Loadbalancer::from(Random)
}
}
#[derive(Debug)]
pub struct Random;
impl LoadBalance for Random {
fn get_peer<'a>(&self, peers: &'a [String]) -> Option<&'a String> {
peers.choose(&mut rand::thread_rng())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::{LoadBalance, Random};
#[test]
fn test_random_lb() {
let peers = vec![
"127.0.0.1:3001".to_string(),
"127.0.0.1:3002".to_string(),
"127.0.0.1:3003".to_string(),
"127.0.0.1:3004".to_string(),
];
let all: HashSet<String> = peers.clone().into_iter().collect();
let random = Random;
for _ in 0..100 {
let peer = random.get_peer(&peers).unwrap();
all.contains(peer);
}
}
}

View File

@@ -45,7 +45,7 @@ async fn setup_grpc_server(name: &str, port: usize) -> (String, TestGuard, Arc<G
async fn test_auto_create_table() {
let (addr, _guard, grpc_server) = setup_grpc_server("auto_create_table", 3991).await;
let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap();
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client);
insert_and_assert(&db).await;
@@ -111,7 +111,7 @@ fn expect_data() -> (Column, Column, Column, Column) {
async fn test_insert_and_select() {
let (addr, _guard, grpc_server) = setup_grpc_server("insert_and_select", 3990).await;
let grpc_client = Client::connect(format!("http://{}", addr)).await.unwrap();
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);

View File

@@ -10,6 +10,7 @@ async-trait = "0.1"
client = { path = "../client" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }

View File

@@ -30,40 +30,37 @@ use crate::frontend::FrontendOptions;
pub(crate) type InstanceRef = Arc<Instance>;
#[derive(Default)]
pub struct Instance {
db: Database,
admin: Admin,
client: Client,
}
impl Instance {
pub(crate) fn new() -> Self {
let client = Client::default();
let db = Database::new("greptime", client.clone());
let admin = Admin::new("greptime", client);
Self { db, admin }
Default::default()
}
pub(crate) async fn start(&mut self, opts: &FrontendOptions) -> Result<()> {
let addr = opts.datanode_grpc_addr();
self.db
.start(addr.clone())
.await
.context(error::ConnectDatanodeSnafu { addr: addr.clone() })?;
self.admin
.start(addr.clone())
.await
.context(error::ConnectDatanodeSnafu { addr })?;
self.client.start(vec![addr]);
Ok(())
}
// TODO(fys): temporarily hard code
pub fn database(&self) -> Database {
Database::new("greptime", self.client.clone())
}
// TODO(fys): temporarily hard code
pub fn admin(&self) -> Admin {
Admin::new("greptime", self.client.clone())
}
}
#[cfg(test)]
impl Instance {
pub fn with_client(client: Client) -> Self {
Self {
db: Database::new("greptime", client.clone()),
admin: Admin::new("greptime", client),
}
Self { client }
}
}
@@ -85,7 +82,7 @@ impl SqlQueryHandler for Instance {
match stmt {
Statement::Query(_) => self
.db
.database()
.select(Select::Sql(query.to_string()))
.await
.and_then(|object_result| object_result.try_into()),
@@ -96,7 +93,7 @@ impl SqlQueryHandler for Instance {
expr: Some(insert_expr::Expr::Sql(query.to_string())),
options: HashMap::default(),
};
self.db
self.database()
.insert(expr)
.await
.and_then(|object_result| object_result.try_into())
@@ -105,7 +102,7 @@ impl SqlQueryHandler for Instance {
let expr = create_to_expr(create)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
self.admin
self.admin()
.create(expr)
.await
.and_then(admin_result_to_output)
@@ -235,7 +232,7 @@ fn columns_to_expr(column_defs: &[ColumnDef]) -> Result<Vec<GrpcColumnDef>> {
#[async_trait]
impl GrpcQueryHandler for Instance {
async fn do_query(&self, query: ObjectExpr) -> server_error::Result<GrpcObjectResult> {
self.db
self.database()
.object(query.clone())
.await
.map_err(BoxedError::new)
@@ -248,7 +245,7 @@ impl GrpcQueryHandler for Instance {
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> server_error::Result<AdminResult> {
self.admin
self.admin()
.do_request(expr.clone())
.await
.map_err(BoxedError::new)

View File

@@ -11,7 +11,7 @@ use crate::instance::Instance;
impl InfluxdbLineProtocolHandler for Instance {
async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> {
let exprs: Vec<InsertExpr> = request.try_into()?;
self.db
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)

View File

@@ -28,7 +28,7 @@ impl Instance {
async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> {
let expr = data_point.as_grpc_insert();
let result = self.db.insert(expr.clone()).await;
let result = self.database().insert(expr.clone()).await;
let object_result = match result {
Ok(result) => result,

View File

@@ -92,7 +92,7 @@ impl PrometheusProtocolHandler for Instance {
async fn write(&self, request: WriteRequest) -> ServerResult<()> {
let exprs = prometheus::write_request_to_insert_exprs(request)?;
self.db
self.database()
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
@@ -107,7 +107,7 @@ impl PrometheusProtocolHandler for Instance {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
let results = handle_remote_queries(&self.db, &request.queries).await?;
let results = handle_remote_queries(&self.database(), &request.queries).await?;
match response_type {
ResponseType::Samples => {

View File

@@ -1,10 +1,10 @@
use std::sync::Arc;
use api::v1::greptime_client::GreptimeClient;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use datanode::instance::Instance as DatanodeInstance;
use servers::grpc::GrpcServer;
use tonic::transport::{Endpoint, Server};
use tonic::transport::Server;
use tower::service_fn;
use crate::instance::Instance;
@@ -37,25 +37,27 @@ pub(crate) async fn create_frontend_instance() -> Arc<Instance> {
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
// "http://[::]:50051" is just a placeholder, does not actually connect to it,
// see https://github.com/hyperium/tonic/issues/727#issuecomment-881532934
let channel = Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(service_fn(move |_| {
let client = client.take();
let addr = "[::].50051";
let channel_manager = ChannelManager::new();
channel_manager
.reset_with_connector(
addr,
service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
}
}
}
}))
.await
}),
)
.unwrap();
let client = Client::with_client(GreptimeClient::new(channel));
let client = Client::with_manager_and_urls(channel_manager, vec![addr]);
Arc::new(Instance::with_client(client))
}