From 82f63735743b5ae13699582dace321ec57ec8893 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 25 Jun 2024 16:40:24 +0800 Subject: [PATCH] feat: FlownodeClient (#4206) * feat: FlownodeClient * chore: remove wrong doc * fix: debug impl for NodeClients * chore: rename `FlownodeClient` to `FlowRequester` --- src/client/src/client.rs | 11 +++ src/client/src/client_manager.rs | 20 +++--- src/client/src/error.rs | 12 +++- src/client/src/flow.rs | 104 ++++++++++++++++++++++++++++ src/client/src/lib.rs | 1 + src/cmd/src/frontend.rs | 4 +- src/meta-srv/src/metasrv/builder.rs | 4 +- src/meta-srv/src/mocks.rs | 4 +- src/meta-srv/src/procedure/tests.rs | 6 +- tests-integration/src/cluster.rs | 8 +-- 10 files changed, 151 insertions(+), 23 deletions(-) create mode 100644 src/client/src/flow.rs diff --git a/src/client/src/client.rs b/src/client/src/client.rs index e9dc07b230..429944d329 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::flow::flow_client::FlowClient as PbFlowClient; use api::v1::health_check_client::HealthCheckClient; use api::v1::prometheus_gateway_client::PrometheusGatewayClient; use api::v1::region::region_client::RegionClient as PbRegionClient; @@ -183,6 +184,16 @@ impl Client { Ok((addr, client)) } + pub(crate) fn raw_flow_client(&self) -> Result<(String, PbFlowClient)> { + let (addr, channel) = self.find_channel()?; + let client = PbFlowClient::new(channel) + .max_decoding_message_size(self.max_grpc_recv_message_size()) + .max_encoding_message_size(self.max_grpc_send_message_size()) + .accept_compressed(CompressionEncoding::Zstd) + .send_compressed(CompressionEncoding::Zstd); + Ok((addr, client)) + } + pub fn make_prometheus_gateway_client(&self) -> Result> { let (_, channel) = self.find_channel()?; let client = PrometheusGatewayClient::new(channel) diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index 528b86fe1f..adc284eaf1 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -21,43 +21,45 @@ use common_meta::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; +use crate::flow::FlowRequester; use crate::region::RegionRequester; use crate::Client; -pub struct DatanodeClients { +pub struct NodeClients { channel_manager: ChannelManager, clients: Cache, } -impl Default for DatanodeClients { +impl Default for NodeClients { fn default() -> Self { Self::new(ChannelConfig::new()) } } -impl Debug for DatanodeClients { +impl Debug for NodeClients { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DatanodeClients") + f.debug_struct("NodeClients") .field("channel_manager", &self.channel_manager) .finish() } } #[async_trait::async_trait] -impl NodeManager for DatanodeClients { +impl NodeManager for NodeClients { async fn datanode(&self, datanode: &Peer) -> DatanodeRef { let client = self.get_client(datanode).await; Arc::new(RegionRequester::new(client)) } - async fn flownode(&self, _node: &Peer) -> FlownodeRef { - // TODO(weny): Support it. - unimplemented!() + async fn flownode(&self, flownode: &Peer) -> FlownodeRef { + let client = self.get_client(flownode).await; + + Arc::new(FlowRequester::new(client)) } } -impl DatanodeClients { +impl NodeClients { pub fn new(config: ChannelConfig) -> Self { Self { channel_manager: ChannelManager::with_config(config), diff --git a/src/client/src/error.rs b/src/client/src/error.rs index c8641087d4..7750295bde 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -98,6 +98,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to request FlowServer {}, code: {}", addr, code))] + FlowServer { + addr: String, + code: Code, + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + // Server error carried in Tonic Status's metadata. #[snafu(display("{}", msg))] Server { @@ -136,7 +145,8 @@ impl ErrorExt for Error { Error::Server { code, .. } => *code, Error::FlightGet { source, .. } | Error::HandleRequest { source, .. } - | Error::RegionServer { source, .. } => source.status_code(), + | Error::RegionServer { source, .. } + | Error::FlowServer { source, .. } => source.status_code(), Error::CreateChannel { source, .. } | Error::ConvertFlightData { source, .. } | Error::CreateTlsChannel { source, .. } => source.status_code(), diff --git a/src/client/src/flow.rs b/src/client/src/flow.rs new file mode 100644 index 0000000000..3d3f7aa557 --- /dev/null +++ b/src/client/src/flow.rs @@ -0,0 +1,104 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::region::InsertRequests; +use common_error::ext::BoxedError; +use common_meta::node_manager::Flownode; +use snafu::{location, Location, ResultExt}; + +use crate::error::Result; +use crate::Client; + +#[derive(Debug)] +pub struct FlowRequester { + client: Client, +} + +#[async_trait::async_trait] +impl Flownode for FlowRequester { + async fn handle(&self, request: FlowRequest) -> common_meta::error::Result { + self.handle_inner(request) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } + + async fn handle_inserts( + &self, + request: InsertRequests, + ) -> common_meta::error::Result { + self.handle_inserts_inner(request) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } +} + +impl FlowRequester { + pub fn new(client: Client) -> Self { + Self { client } + } + + async fn handle_inner(&self, request: FlowRequest) -> Result { + let (addr, mut client) = self.client.raw_flow_client()?; + + let response = client + .handle_create_remove(request) + .await + .map_err(|e| { + let code = e.code(); + let err: crate::error::Error = e.into(); + crate::error::Error::FlowServer { + addr, + code, + source: BoxedError::new(err), + location: location!(), + } + })? + .into_inner(); + Ok(response) + } + + async fn handle_inserts_inner(&self, request: InsertRequests) -> Result { + let (addr, mut client) = self.client.raw_flow_client()?; + + let requests = api::v1::flow::InsertRequests { + requests: request + .requests + .into_iter() + .map(|insert| api::v1::flow::InsertRequest { + region_id: insert.region_id, + rows: insert.rows, + }) + .collect(), + }; + + let response = client + .handle_mirror_request(requests) + .await + .map_err(|e| { + let code = e.code(); + let err: crate::error::Error = e.into(); + crate::error::Error::FlowServer { + addr, + code, + source: BoxedError::new(err), + location: location!(), + } + })? + .into_inner(); + Ok(response) + } +} diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 0741c8e1c7..3456a17489 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -19,6 +19,7 @@ pub mod client_manager; #[cfg(feature = "testing")] mod database; pub mod error; +pub mod flow; pub mod load_balance; mod metrics; pub mod region; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 1b44f06f93..31f4a10848 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; -use client::client_manager::DatanodeClients; +use client::client_manager::NodeClients; use common_config::Configurable; use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; @@ -333,7 +333,7 @@ impl StartCommand { timeout: None, ..Default::default() }; - let client = DatanodeClients::new(channel_config); + let client = NodeClients::new(channel_config); let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8759e7e975..844e2f786d 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -16,7 +16,7 @@ use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::time::Duration; -use client::client_manager::DatanodeClients; +use client::client_manager::NodeClients; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; @@ -272,7 +272,7 @@ impl MetasrvBuilder { options.datanode.client_options.connect_timeout_millis, )) .tcp_nodelay(options.datanode.client_options.tcp_nodelay); - Arc::new(DatanodeClients::new(datanode_client_channel_config)) + Arc::new(NodeClients::new(datanode_client_channel_config)) }); let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( mailbox.clone(), diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index e9a3b58c8e..014d524927 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -18,7 +18,7 @@ use std::time::Duration; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::procedure_service_server::ProcedureServiceServer; use api::v1::meta::store_server::StoreServer; -use client::client_manager::DatanodeClients; +use client::client_manager::NodeClients; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::etcd::EtcdStore; @@ -55,7 +55,7 @@ pub async fn mock( opts: MetasrvOptions, kv_backend: KvBackendRef, selector: Option, - datanode_clients: Option>, + datanode_clients: Option>, ) -> MockInfo { let server_addr = opts.server_addr.clone(); let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 028df54110..4f1c97f14c 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -19,7 +19,7 @@ use api::v1::meta::Partition; use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{CreateRequest as PbCreateRegionRequest, RegionColumnDef}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, SemanticType}; -use client::client_manager::DatanodeClients; +use client::client_manager::NodeClients; use common_catalog::consts::MITO2_ENGINE; use common_meta::ddl::create_logical_tables::{CreateLogicalTablesProcedure, CreateTablesState}; use common_meta::ddl::create_table::*; @@ -99,7 +99,7 @@ fn test_region_request_builder() { let mut procedure = CreateTableProcedure::new( 1, create_table_task(None), - test_data::new_ddl_context(Arc::new(DatanodeClients::default())), + test_data::new_ddl_context(Arc::new(NodeClients::default())), ); procedure.set_allocated_metadata( @@ -174,7 +174,7 @@ async fn new_node_manager( region_server: &EchoRegionServer, region_routes: &[RegionRoute], ) -> NodeManagerRef { - let clients = DatanodeClients::default(); + let clients = NodeClients::default(); let datanodes = find_leaders(region_routes); for datanode in datanodes { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index c524e05523..eb5dc2d5cd 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -21,7 +21,7 @@ use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; -use client::client_manager::DatanodeClients; +use client::client_manager::NodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -168,7 +168,7 @@ impl GreptimeDbClusterBuilder { ) -> GreptimeDbCluster { let datanodes = datanode_options.len(); let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20)); - let datanode_clients = Arc::new(DatanodeClients::new(channel_config)); + let datanode_clients = Arc::new(NodeClients::new(channel_config)); let opt = MetasrvOptions { procedure: ProcedureConfig { @@ -338,7 +338,7 @@ impl GreptimeDbClusterBuilder { async fn build_frontend( &self, metasrv: MockInfo, - datanode_clients: Arc, + datanode_clients: Arc, ) -> Arc { let mut meta_client = MetaClientBuilder::frontend_default_options(1000) .channel_manager(metasrv.channel_manager) @@ -401,7 +401,7 @@ impl GreptimeDbClusterBuilder { } async fn build_datanode_clients( - clients: Arc, + clients: Arc, instances: &HashMap, datanodes: usize, ) {