refactor(flow): make start flownode clearer (#5848)

refactor: make start flownode clearer
This commit is contained in:
discord9
2025-04-08 22:08:51 +08:00
committed by GitHub
parent 7ea04817bd
commit 72625958bf
5 changed files with 153 additions and 92 deletions

View File

@@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use flow::{FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendInvoker};
use meta_client::{MetaClientOptions, MetaClientType};
use snafu::{ensure, OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
@@ -314,7 +314,7 @@ impl StartCommand {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flownode_builder = FlownodeBuilder::new(
opts,
opts.clone(),
Plugins::new(),
table_metadata_manager,
catalog_manager.clone(),
@@ -322,7 +322,15 @@ impl StartCommand {
)
.with_heartbeat_task(heartbeat_task);
let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let services = FlownodeServiceBuilder::new(&opts)
.with_grpc_server(flownode.flownode_server().clone())
.enable_http_service()
.build()
.await
.context(StartFlownodeSnafu)?;
flownode.setup_services(services);
let flownode = flownode;
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.

View File

@@ -55,7 +55,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use flow::{
FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions,
FrontendInvoker,
};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
@@ -74,10 +77,10 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::RwLock;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::Result;
use crate::error::{Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{error, log_versions, App};
@@ -244,9 +247,7 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: Frontend,
// TODO(discord9): wrapped it in flownode instance instead
flow_worker_manager: Arc<FlowWorkerManager>,
flow_shutdown: broadcast::Sender<()>,
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// Keep the logging guard to prevent the worker from being dropped.
@@ -288,9 +289,7 @@ impl App for Instance {
.await
.context(error::StartFrontendSnafu)?;
self.flow_worker_manager
.clone()
.run_background(Some(self.flow_shutdown.subscribe()));
self.flownode.start().await.context(StartFlownodeSnafu)?;
Ok(())
}
@@ -311,14 +310,9 @@ impl App for Instance {
.await
.context(error::ShutdownDatanodeSnafu)?;
self.flow_shutdown
.send(())
.map_err(|_e| {
flow::error::InternalSnafu {
reason: "Failed to send shutdown signal to flow worker manager, all receiver end already closed".to_string(),
}
.build()
})
self.flownode
.shutdown()
.await
.context(error::ShutdownFlownodeSnafu)?;
info!("Datanode instance stopped.");
@@ -536,13 +530,11 @@ impl StartCommand {
catalog_manager.clone(),
flow_metadata_manager.clone(),
);
let flownode = Arc::new(
flow_builder
.build()
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?,
);
let flownode = flow_builder
.build()
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?;
// set the ref to query for the local flow state
{
@@ -622,8 +614,6 @@ impl StartCommand {
.context(error::StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;
let (tx, _rx) = broadcast::channel(1);
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(error::ServersSnafu)?;
@@ -642,8 +632,7 @@ impl StartCommand {
Ok(Instance {
datanode,
frontend,
flow_worker_manager,
flow_shutdown: tx,
flownode,
procedure_manager,
wal_options_allocator,
_guard: guard,

View File

@@ -21,7 +21,6 @@ use api::v1::flow::{
use api::v1::region::InsertRequests;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use datatypes::value::Value;
use itertools::Itertools;
@@ -46,7 +45,7 @@ fn to_meta_err(
}
#[async_trait::async_trait]
impl Flownode for FlowWorkerManager {
impl common_meta::node_manager::Flownode for FlowWorkerManager {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse> {
let query_ctx = request
.header

View File

@@ -44,4 +44,6 @@ mod test_utils;
pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};
pub use server::{
FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker,
};

View File

@@ -38,12 +38,12 @@ use operator::insert::Inserter;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use servers::server::{ServerHandler, ServerHandlers};
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
use tonic::codec::CompressionEncoding;
@@ -133,23 +133,55 @@ impl flow_server::Flow for FlowService {
}
}
#[derive(Clone)]
pub struct FlownodeServer {
shutdown_tx: Mutex<Option<broadcast::Sender<()>>>,
inner: Arc<FlownodeServerInner>,
}
struct FlownodeServerInner {
/// worker shutdown signal, not to be confused with server_shutdown_tx
worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// server shutdown signal for shutdown grpc server
server_shutdown_tx: Mutex<broadcast::Sender<()>>,
flow_service: FlowService,
}
impl FlownodeServer {
pub fn new(flow_service: FlowService) -> Self {
let (tx, _rx) = broadcast::channel::<()>(1);
let (server_tx, _server_rx) = broadcast::channel::<()>(1);
Self {
flow_service,
shutdown_tx: Mutex::new(None),
inner: Arc::new(FlownodeServerInner {
flow_service,
worker_shutdown_tx: Mutex::new(tx),
server_shutdown_tx: Mutex::new(server_tx),
}),
}
}
/// Start the background task for streaming computation.
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.manager.clone();
let _handle = manager_ref
.clone()
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
Ok(())
}
/// Stop the background task for streaming computation.
async fn stop_workers(&self) -> Result<(), Error> {
let tx = self.inner.worker_shutdown_tx.lock().await;
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
Ok(())
}
}
impl FlownodeServer {
pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
flow_server::FlowServer::new(self.flow_service.clone())
flow_server::FlowServer::new(self.inner.flow_service.clone())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
@@ -160,25 +192,19 @@ impl FlownodeServer {
#[async_trait::async_trait]
impl servers::server::Server for FlownodeServer {
async fn shutdown(&self) -> Result<(), servers::error::Error> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
let tx = self.inner.server_shutdown_tx.lock().await;
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
info!("Shutdown flow node server");
Ok(())
}
async fn start(&self, addr: SocketAddr) -> Result<SocketAddr, servers::error::Error> {
let (tx, rx) = broadcast::channel::<()>(1);
let mut rx_server = tx.subscribe();
let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe();
let (incoming, addr) = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu { server: "flow" }
);
let listener = TcpListener::bind(addr)
.await
.context(TcpBindSnafu { addr })?;
@@ -187,8 +213,6 @@ impl servers::server::Server for FlownodeServer {
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("flow server is bound to {}", addr);
*shutdown_tx = Some(tx);
(incoming, addr)
};
@@ -201,9 +225,6 @@ impl servers::server::Server for FlownodeServer {
.context(StartGrpcSnafu);
});
let manager_ref = self.flow_service.manager.clone();
let _handle = manager_ref.clone().run_background(Some(rx));
Ok(addr)
}
@@ -214,11 +235,8 @@ impl servers::server::Server for FlownodeServer {
/// The flownode server instance.
pub struct FlownodeInstance {
server: FlownodeServer,
addr: SocketAddr,
/// only used for health check
http_server: HttpServer,
http_addr: SocketAddr,
flownode_server: FlownodeServer,
services: ServerHandlers,
heartbeat_task: Option<HeartbeatTask>,
}
@@ -228,36 +246,37 @@ impl FlownodeInstance {
task.start().await?;
}
self.addr = self
.server
.start(self.addr)
.await
.context(StartServerSnafu)?;
self.flownode_server.start_workers().await?;
self.http_server
.start(self.http_addr)
.await
.context(StartServerSnafu)?;
self.services.start_all().await.context(StartServerSnafu)?;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), crate::Error> {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
self.services
.shutdown_all()
.await
.context(ShutdownServerSnafu)?;
self.flownode_server.stop_workers().await?;
if let Some(task) = &self.heartbeat_task {
task.shutdown();
}
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
pub fn flownode_server(&self) -> &FlownodeServer {
&self.flownode_server
}
pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef {
self.server.flow_service.manager.clone()
self.flownode_server.inner.flow_service.manager.clone()
}
pub fn setup_services(&mut self, services: ServerHandlers) {
self.services = services;
}
}
@@ -325,21 +344,11 @@ impl FlownodeBuilder {
let server = FlownodeServer::new(FlowService::new(manager.clone()));
let http_addr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: self.opts.http.addr.clone(),
})?;
let http_server = HttpServerBuilder::new(self.opts.http)
.with_metrics_handler(MetricsHandler)
.build();
let heartbeat_task = self.heartbeat_task;
let addr = self.opts.grpc.bind_addr;
let instance = FlownodeInstance {
server,
addr: addr.parse().context(ParseAddrSnafu { addr })?,
http_server,
http_addr,
flownode_server: server,
services: ServerHandlers::new(),
heartbeat_task,
};
Ok(instance)
@@ -475,6 +484,60 @@ impl FlownodeBuilder {
}
}
/// Useful in distributed mode
pub struct FlownodeServiceBuilder<'a> {
opts: &'a FlownodeOptions,
grpc_server: Option<FlownodeServer>,
enable_http_service: bool,
}
impl<'a> FlownodeServiceBuilder<'a> {
pub fn new(opts: &'a FlownodeOptions) -> Self {
Self {
opts,
grpc_server: None,
enable_http_service: false,
}
}
pub fn enable_http_service(self) -> Self {
Self {
enable_http_service: true,
..self
}
}
pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self {
Self {
grpc_server: Some(grpc_server),
..self
}
}
pub async fn build(mut self) -> Result<ServerHandlers, Error> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.grpc.bind_addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler).await;
}
if self.enable_http_service {
let http_server = HttpServerBuilder::new(self.opts.http.clone())
.with_metrics_handler(MetricsHandler)
.build();
let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
let handler: ServerHandler = (Box::new(http_server), addr);
handlers.insert(handler).await;
}
Ok(handlers)
}
}
#[derive(Clone)]
pub struct FrontendInvoker {
inserter: Arc<Inserter>,