diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index e62cac7de8..bd0f276c10 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -361,7 +361,7 @@ impl StartCommand { let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; let services = FlownodeServiceBuilder::new(&opts) - .with_grpc_server(flownode.flownode_server().clone()) + .with_default_grpc_server(flownode.flownode_server()) .enable_http_service() .build() .context(StartFlownodeSnafu)?; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 5711f4ada6..d065253d9b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -32,7 +32,7 @@ use common_meta::node_manager::{Flownode, NodeManagerRef}; use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::tracing::info; -use futures::{FutureExt, TryStreamExt}; +use futures::TryStreamExt; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use operator::delete::Deleter; @@ -40,16 +40,16 @@ use operator::insert::Inserter; use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; use query::{QueryEngine, QueryEngineFactory}; -use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; +use servers::add_service; +use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::server::{ServerHandler, ServerHandlers}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; use tonic::codec::CompressionEncoding; -use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef}; @@ -231,50 +231,6 @@ impl FlownodeServer { } } -#[async_trait::async_trait] -impl servers::server::Server for FlownodeServer { - async fn shutdown(&self) -> Result<(), servers::error::Error> { - let tx = self.inner.server_shutdown_tx.lock().await; - if tx.send(()).is_err() { - info!("Receiver dropped, the flow node server has already shutdown"); - } - info!("Shutdown flow node server"); - - Ok(()) - } - - async fn start(&mut self, addr: SocketAddr) -> Result<(), servers::error::Error> { - let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe(); - - let incoming = { - let listener = TcpListener::bind(addr) - .await - .context(TcpBindSnafu { addr })?; - let addr = listener.local_addr().context(TcpBindSnafu { addr })?; - let incoming = - TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?; - info!("flow server is bound to {}", addr); - - incoming - }; - - let builder = tonic::transport::Server::builder().add_service(self.create_flow_service()); - - let _handle = common_runtime::spawn_global(async move { - let _result = builder - .serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop)) - .await - .context(StartGrpcSnafu); - }); - - Ok(()) - } - - fn name(&self) -> &str { - FLOW_NODE_SERVER_NAME - } -} - /// The flownode server instance. pub struct FlownodeInstance { flownode_server: FlownodeServer, @@ -470,7 +426,7 @@ impl FlownodeBuilder { /// Useful in distributed mode pub struct FlownodeServiceBuilder<'a> { opts: &'a FlownodeOptions, - grpc_server: Option, + grpc_server: Option, enable_http_service: bool, } @@ -490,13 +446,19 @@ impl<'a> FlownodeServiceBuilder<'a> { } } - pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self { + pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self { Self { grpc_server: Some(grpc_server), ..self } } + pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self { + let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build(); + self.grpc_server = Some(grpc_server); + self + } + pub fn build(mut self) -> Result { let handlers = ServerHandlers::default(); if let Some(grpc_server) = self.grpc_server.take() { @@ -519,6 +481,22 @@ impl<'a> FlownodeServiceBuilder<'a> { } Ok(handlers) } + + pub fn grpc_server_builder( + opts: &FlownodeOptions, + flownode_server: &FlownodeServer, + ) -> GrpcServerBuilder { + let config = GrpcServerConfig { + max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize, + tls: opts.grpc.tls.clone(), + }; + let service = flownode_server.create_flow_service(); + let runtime = common_runtime::global_runtime(); + let mut builder = GrpcServerBuilder::new(config, runtime); + add_service!(builder, service); + builder + } } /// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 479af1dc07..50797d44c4 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -417,6 +417,7 @@ pub struct Metasrv { meta_peer_client: MetaPeerClientRef, // The selector is used to select a target datanode. selector: SelectorRef, + selector_ctx: SelectorContext, // The flow selector is used to select a target flownode. flow_selector: SelectorRef, handler_group: RwLock>, @@ -654,6 +655,10 @@ impl Metasrv { &self.selector } + pub fn selector_ctx(&self) -> &SelectorContext { + &self.selector_ctx + } + pub fn flow_selector(&self) -> &SelectorRef { &self.flow_selector } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index fa9f24efa6..167c5afd8e 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -452,6 +452,7 @@ impl MetasrvBuilder { leader_cached_kv_backend, meta_peer_client: meta_peer_client.clone(), selector, + selector_ctx, // TODO(jeremy): We do not allow configuring the flow selector. flow_selector, handler_group: RwLock::new(None),