refactor: make flownode gRPC services able to be added dynamically (#6323)

chore: enhance the flownode gRPC servers extension
This commit is contained in:
fys
2025-06-17 14:27:41 +08:00
committed by GitHub
parent 079daf5db9
commit 3e3a12385c
4 changed files with 35 additions and 51 deletions

View File

@@ -361,7 +361,7 @@ impl StartCommand {
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let services = FlownodeServiceBuilder::new(&opts)
.with_grpc_server(flownode.flownode_server().clone())
.with_default_grpc_server(flownode.flownode_server())
.enable_http_service()
.build()
.context(StartFlownodeSnafu)?;

View File

@@ -32,7 +32,7 @@ use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::tracing::info;
use futures::{FutureExt, TryStreamExt};
use futures::TryStreamExt;
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
use operator::delete::Deleter;
@@ -40,16 +40,16 @@ use operator::insert::Inserter;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::add_service;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
@@ -231,50 +231,6 @@ impl FlownodeServer {
}
}
#[async_trait::async_trait]
impl servers::server::Server for FlownodeServer {
async fn shutdown(&self) -> Result<(), servers::error::Error> {
let tx = self.inner.server_shutdown_tx.lock().await;
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
info!("Shutdown flow node server");
Ok(())
}
async fn start(&mut self, addr: SocketAddr) -> Result<(), servers::error::Error> {
let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe();
let incoming = {
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming =
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("flow server is bound to {}", addr);
incoming
};
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
let _handle = common_runtime::spawn_global(async move {
let _result = builder
.serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop))
.await
.context(StartGrpcSnafu);
});
Ok(())
}
fn name(&self) -> &str {
FLOW_NODE_SERVER_NAME
}
}
/// The flownode server instance.
pub struct FlownodeInstance {
flownode_server: FlownodeServer,
@@ -470,7 +426,7 @@ impl FlownodeBuilder {
/// Useful in distributed mode
pub struct FlownodeServiceBuilder<'a> {
opts: &'a FlownodeOptions,
grpc_server: Option<FlownodeServer>,
grpc_server: Option<GrpcServer>,
enable_http_service: bool,
}
@@ -490,13 +446,19 @@ impl<'a> FlownodeServiceBuilder<'a> {
}
}
pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self {
pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
Self {
grpc_server: Some(grpc_server),
..self
}
}
pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
self.grpc_server = Some(grpc_server);
self
}
pub fn build(mut self) -> Result<ServerHandlers, Error> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
@@ -519,6 +481,22 @@ impl<'a> FlownodeServiceBuilder<'a> {
}
Ok(handlers)
}
pub fn grpc_server_builder(
opts: &FlownodeOptions,
flownode_server: &FlownodeServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
};
let service = flownode_server.create_flow_service();
let runtime = common_runtime::global_runtime();
let mut builder = GrpcServerBuilder::new(config, runtime);
add_service!(builder, service);
builder
}
}
/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which

View File

@@ -417,6 +417,7 @@ pub struct Metasrv {
meta_peer_client: MetaPeerClientRef,
// The selector is used to select a target datanode.
selector: SelectorRef,
selector_ctx: SelectorContext,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
@@ -654,6 +655,10 @@ impl Metasrv {
&self.selector
}
pub fn selector_ctx(&self) -> &SelectorContext {
&self.selector_ctx
}
pub fn flow_selector(&self) -> &SelectorRef {
&self.flow_selector
}

View File

@@ -452,6 +452,7 @@ impl MetasrvBuilder {
leader_cached_kv_backend,
meta_peer_client: meta_peer_client.clone(),
selector,
selector_ctx,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector,
handler_group: RwLock::new(None),