feat: FlownodeClient (#4206)

* feat: FlownodeClient

* chore: remove wrong doc

* fix: debug impl for NodeClients

* chore: rename `FlownodeClient` to `FlowRequester`
This commit is contained in:
discord9
2024-06-25 16:40:24 +08:00
committed by GitHub
parent 1e815dddf1
commit 82f6373574
10 changed files with 151 additions and 23 deletions

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::flow::flow_client::FlowClient as PbFlowClient;
use api::v1::health_check_client::HealthCheckClient;
use api::v1::prometheus_gateway_client::PrometheusGatewayClient;
use api::v1::region::region_client::RegionClient as PbRegionClient;
@@ -183,6 +184,16 @@ impl Client {
Ok((addr, client))
}
pub(crate) fn raw_flow_client(&self) -> Result<(String, PbFlowClient<Channel>)> {
let (addr, channel) = self.find_channel()?;
let client = PbFlowClient::new(channel)
.max_decoding_message_size(self.max_grpc_recv_message_size())
.max_encoding_message_size(self.max_grpc_send_message_size())
.accept_compressed(CompressionEncoding::Zstd)
.send_compressed(CompressionEncoding::Zstd);
Ok((addr, client))
}
pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
let (_, channel) = self.find_channel()?;
let client = PrometheusGatewayClient::new(channel)

View File

@@ -21,43 +21,45 @@ use common_meta::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use common_meta::peer::Peer;
use moka::future::{Cache, CacheBuilder};
use crate::flow::FlowRequester;
use crate::region::RegionRequester;
use crate::Client;
pub struct DatanodeClients {
pub struct NodeClients {
channel_manager: ChannelManager,
clients: Cache<Peer, Client>,
}
impl Default for DatanodeClients {
impl Default for NodeClients {
fn default() -> Self {
Self::new(ChannelConfig::new())
}
}
impl Debug for DatanodeClients {
impl Debug for NodeClients {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatanodeClients")
f.debug_struct("NodeClients")
.field("channel_manager", &self.channel_manager)
.finish()
}
}
#[async_trait::async_trait]
impl NodeManager for DatanodeClients {
impl NodeManager for NodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await;
Arc::new(RegionRequester::new(client))
}
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
// TODO(weny): Support it.
unimplemented!()
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
let client = self.get_client(flownode).await;
Arc::new(FlowRequester::new(client))
}
}
impl DatanodeClients {
impl NodeClients {
pub fn new(config: ChannelConfig) -> Self {
Self {
channel_manager: ChannelManager::with_config(config),

View File

@@ -98,6 +98,15 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to request FlowServer {}, code: {}", addr, code))]
FlowServer {
addr: String,
code: Code,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
// Server error carried in Tonic Status's metadata.
#[snafu(display("{}", msg))]
Server {
@@ -136,7 +145,8 @@ impl ErrorExt for Error {
Error::Server { code, .. } => *code,
Error::FlightGet { source, .. }
| Error::HandleRequest { source, .. }
| Error::RegionServer { source, .. } => source.status_code(),
| Error::RegionServer { source, .. }
| Error::FlowServer { source, .. } => source.status_code(),
Error::CreateChannel { source, .. }
| Error::ConvertFlightData { source, .. }
| Error::CreateTlsChannel { source, .. } => source.status_code(),

104
src/client/src/flow.rs Normal file
View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode;
use snafu::{location, Location, ResultExt};
use crate::error::Result;
use crate::Client;
#[derive(Debug)]
pub struct FlowRequester {
client: Client,
}
#[async_trait::async_trait]
impl Flownode for FlowRequester {
async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
self.handle_inner(request)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
async fn handle_inserts(
&self,
request: InsertRequests,
) -> common_meta::error::Result<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
}
impl FlowRequester {
pub fn new(client: Client) -> Self {
Self { client }
}
async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_create_remove(request)
.await
.map_err(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
crate::error::Error::FlowServer {
addr,
code,
source: BoxedError::new(err),
location: location!(),
}
})?
.into_inner();
Ok(response)
}
async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let requests = api::v1::flow::InsertRequests {
requests: request
.requests
.into_iter()
.map(|insert| api::v1::flow::InsertRequest {
region_id: insert.region_id,
rows: insert.rows,
})
.collect(),
};
let response = client
.handle_mirror_request(requests)
.await
.map_err(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
crate::error::Error::FlowServer {
addr,
code,
source: BoxedError::new(err),
location: location!(),
}
})?
.into_inner();
Ok(response)
}
}

View File

@@ -19,6 +19,7 @@ pub mod client_manager;
#[cfg(feature = "testing")]
mod database;
pub mod error;
pub mod flow;
pub mod load_balance;
mod metrics;
pub mod region;

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::DatanodeClients;
use client::client_manager::NodeClients;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
@@ -333,7 +333,7 @@ impl StartCommand {
timeout: None,
..Default::default()
};
let client = DatanodeClients::new(channel_config);
let client = NodeClients::new(channel_config);
let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),

View File

@@ -16,7 +16,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use client::client_manager::DatanodeClients;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::ChannelConfig;
@@ -272,7 +272,7 @@ impl MetasrvBuilder {
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
Arc::new(NodeClients::new(datanode_client_channel_config))
});
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
mailbox.clone(),

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::procedure_service_server::ProcedureServiceServer;
use api::v1::meta::store_server::StoreServer;
use client::client_manager::DatanodeClients;
use client::client_manager::NodeClients;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
@@ -55,7 +55,7 @@ pub async fn mock(
opts: MetasrvOptions,
kv_backend: KvBackendRef,
selector: Option<SelectorRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
datanode_clients: Option<Arc<NodeClients>>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

View File

@@ -19,7 +19,7 @@ use api::v1::meta::Partition;
use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef};
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType};
use client::client_manager::DatanodeClients;
use client::client_manager::NodeClients;
use common_catalog::consts::MITO2_ENGINE;
use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState};
use common_meta::ddl::create_table::*;
@@ -99,7 +99,7 @@ fn test_region_request_builder() {
let mut procedure = CreateTableProcedure::new(
1,
create_table_task(None),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
test_data::new_ddl_context(Arc::new(NodeClients::default())),
);
procedure.set_allocated_metadata(
@@ -174,7 +174,7 @@ async fn new_node_manager(
region_server: &EchoRegionServer,
region_routes: &[RegionRoute],
) -> NodeManagerRef {
let clients = DatanodeClients::default();
let clients = NodeClients::default();
let datanodes = find_leaders(region_routes);
for datanode in datanodes {

View File

@@ -21,7 +21,7 @@ use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::client_manager::NodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -168,7 +168,7 @@ impl GreptimeDbClusterBuilder {
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));
let datanode_clients = Arc::new(NodeClients::new(channel_config));
let opt = MetasrvOptions {
procedure: ProcedureConfig {
@@ -338,7 +338,7 @@ impl GreptimeDbClusterBuilder {
async fn build_frontend(
&self,
metasrv: MockInfo,
datanode_clients: Arc<DatanodeClients>,
datanode_clients: Arc<NodeClients>,
) -> Arc<FeInstance> {
let mut meta_client = MetaClientBuilder::frontend_default_options(1000)
.channel_manager(metasrv.channel_manager)
@@ -401,7 +401,7 @@ impl GreptimeDbClusterBuilder {
}
async fn build_datanode_clients(
clients: Arc<DatanodeClients>,
clients: Arc<NodeClients>,
instances: &HashMap<DatanodeId, Datanode>,
datanodes: usize,
) {