diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 667fe437d9..fc23d37c23 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -32,7 +32,7 @@ use common_meta::key::TableMetadataManager; use common_telemetry::info; use common_telemetry::logging::TracingOptions; use common_version::{short_version, version}; -use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker}; +use flow::{FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendInvoker}; use meta_client::{MetaClientOptions, MetaClientType}; use snafu::{ensure, OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; @@ -314,7 +314,7 @@ impl StartCommand { let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); let flownode_builder = FlownodeBuilder::new( - opts, + opts.clone(), Plugins::new(), table_metadata_manager, catalog_manager.clone(), @@ -322,7 +322,15 @@ impl StartCommand { ) .with_heartbeat_task(heartbeat_task); - let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; + let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; + let services = FlownodeServiceBuilder::new(&opts) + .with_grpc_server(flownode.flownode_server().clone()) + .enable_http_service() + .build() + .await + .context(StartFlownodeSnafu)?; + flownode.setup_services(services); + let flownode = flownode; // flownode's frontend to datanode need not timeout. // Some queries are expected to take long time. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 237db63a5b..4504927cc8 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -55,7 +55,10 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; -use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker}; +use flow::{ + FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions, + FrontendInvoker, +}; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager}; @@ -74,10 +77,10 @@ use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; use snafu::ResultExt; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::RwLock; use tracing_appender::non_blocking::WorkerGuard; -use crate::error::Result; +use crate::error::{Result, StartFlownodeSnafu}; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{error, log_versions, App}; @@ -244,9 +247,7 @@ impl StandaloneOptions { pub struct Instance { datanode: Datanode, frontend: Frontend, - // TODO(discord9): wrapped it in flownode instance instead - flow_worker_manager: Arc, - flow_shutdown: broadcast::Sender<()>, + flownode: FlownodeInstance, procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, // Keep the logging guard to prevent the worker from being dropped. @@ -288,9 +289,7 @@ impl App for Instance { .await .context(error::StartFrontendSnafu)?; - self.flow_worker_manager - .clone() - .run_background(Some(self.flow_shutdown.subscribe())); + self.flownode.start().await.context(StartFlownodeSnafu)?; Ok(()) } @@ -311,14 +310,9 @@ impl App for Instance { .await .context(error::ShutdownDatanodeSnafu)?; - self.flow_shutdown - .send(()) - .map_err(|_e| { - flow::error::InternalSnafu { - reason: "Failed to send shutdown signal to flow worker manager, all receiver end already closed".to_string(), - } - .build() - }) + self.flownode + .shutdown() + .await .context(error::ShutdownFlownodeSnafu)?; info!("Datanode instance stopped."); @@ -536,13 +530,11 @@ impl StartCommand { catalog_manager.clone(), flow_metadata_manager.clone(), ); - let flownode = Arc::new( - flow_builder - .build() - .await - .map_err(BoxedError::new) - .context(error::OtherSnafu)?, - ); + let flownode = flow_builder + .build() + .await + .map_err(BoxedError::new) + .context(error::OtherSnafu)?; // set the ref to query for the local flow state { @@ -622,8 +614,6 @@ impl StartCommand { .context(error::StartFlownodeSnafu)?; flow_worker_manager.set_frontend_invoker(invoker).await; - let (tx, _rx) = broadcast::channel(1); - let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) .context(error::ServersSnafu)?; @@ -642,8 +632,7 @@ impl StartCommand { Ok(Instance { datanode, frontend, - flow_worker_manager, - flow_shutdown: tx, + flownode, procedure_manager, wal_options_allocator, _guard: guard, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 0c316d9d0f..1daec77fbd 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -21,7 +21,6 @@ use api::v1::flow::{ use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; -use common_meta::node_manager::Flownode; use common_telemetry::{debug, trace}; use datatypes::value::Value; use itertools::Itertools; @@ -46,7 +45,7 @@ fn to_meta_err( } #[async_trait::async_trait] -impl Flownode for FlowWorkerManager { +impl common_meta::node_manager::Flownode for FlowWorkerManager { async fn handle(&self, request: FlowRequest) -> Result { let query_ctx = request .header diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 8dcf29dee9..f6af31f8eb 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -44,4 +44,6 @@ mod test_utils; pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; pub use error::{Error, Result}; -pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker}; +pub use server::{ + FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker, +}; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 742359818a..f347ac369e 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -38,12 +38,12 @@ use operator::insert::Inserter; use operator::statement::StatementExecutor; use partition::manager::PartitionRuleManager; use query::{QueryEngine, QueryEngineFactory}; -use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; -use servers::http::{HttpServer, HttpServerBuilder}; +use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; +use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; -use servers::server::Server; +use servers::server::{ServerHandler, ServerHandlers}; use session::context::{QueryContextBuilder, QueryContextRef}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; use tonic::codec::CompressionEncoding; @@ -133,23 +133,55 @@ impl flow_server::Flow for FlowService { } } +#[derive(Clone)] pub struct FlownodeServer { - shutdown_tx: Mutex>>, + inner: Arc, +} + +struct FlownodeServerInner { + /// worker shutdown signal, not to be confused with server_shutdown_tx + worker_shutdown_tx: Mutex>, + /// server shutdown signal for shutdown grpc server + server_shutdown_tx: Mutex>, flow_service: FlowService, } impl FlownodeServer { pub fn new(flow_service: FlowService) -> Self { + let (tx, _rx) = broadcast::channel::<()>(1); + let (server_tx, _server_rx) = broadcast::channel::<()>(1); Self { - flow_service, - shutdown_tx: Mutex::new(None), + inner: Arc::new(FlownodeServerInner { + flow_service, + worker_shutdown_tx: Mutex::new(tx), + server_shutdown_tx: Mutex::new(server_tx), + }), } } + + /// Start the background task for streaming computation. + async fn start_workers(&self) -> Result<(), Error> { + let manager_ref = self.inner.flow_service.manager.clone(); + let _handle = manager_ref + .clone() + .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe())); + + Ok(()) + } + + /// Stop the background task for streaming computation. + async fn stop_workers(&self) -> Result<(), Error> { + let tx = self.inner.worker_shutdown_tx.lock().await; + if tx.send(()).is_err() { + info!("Receiver dropped, the flow node server has already shutdown"); + } + Ok(()) + } } impl FlownodeServer { pub fn create_flow_service(&self) -> flow_server::FlowServer { - flow_server::FlowServer::new(self.flow_service.clone()) + flow_server::FlowServer::new(self.inner.flow_service.clone()) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) @@ -160,25 +192,19 @@ impl FlownodeServer { #[async_trait::async_trait] impl servers::server::Server for FlownodeServer { async fn shutdown(&self) -> Result<(), servers::error::Error> { - let mut shutdown_tx = self.shutdown_tx.lock().await; - if let Some(tx) = shutdown_tx.take() { - if tx.send(()).is_err() { - info!("Receiver dropped, the flow node server has already shutdown"); - } + let tx = self.inner.server_shutdown_tx.lock().await; + if tx.send(()).is_err() { + info!("Receiver dropped, the flow node server has already shutdown"); } info!("Shutdown flow node server"); Ok(()) } + async fn start(&self, addr: SocketAddr) -> Result { - let (tx, rx) = broadcast::channel::<()>(1); - let mut rx_server = tx.subscribe(); + let mut rx_server = self.inner.server_shutdown_tx.lock().await.subscribe(); + let (incoming, addr) = { - let mut shutdown_tx = self.shutdown_tx.lock().await; - ensure!( - shutdown_tx.is_none(), - AlreadyStartedSnafu { server: "flow" } - ); let listener = TcpListener::bind(addr) .await .context(TcpBindSnafu { addr })?; @@ -187,8 +213,6 @@ impl servers::server::Server for FlownodeServer { TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?; info!("flow server is bound to {}", addr); - *shutdown_tx = Some(tx); - (incoming, addr) }; @@ -201,9 +225,6 @@ impl servers::server::Server for FlownodeServer { .context(StartGrpcSnafu); }); - let manager_ref = self.flow_service.manager.clone(); - let _handle = manager_ref.clone().run_background(Some(rx)); - Ok(addr) } @@ -214,11 +235,8 @@ impl servers::server::Server for FlownodeServer { /// The flownode server instance. pub struct FlownodeInstance { - server: FlownodeServer, - addr: SocketAddr, - /// only used for health check - http_server: HttpServer, - http_addr: SocketAddr, + flownode_server: FlownodeServer, + services: ServerHandlers, heartbeat_task: Option, } @@ -228,36 +246,37 @@ impl FlownodeInstance { task.start().await?; } - self.addr = self - .server - .start(self.addr) - .await - .context(StartServerSnafu)?; + self.flownode_server.start_workers().await?; - self.http_server - .start(self.http_addr) - .await - .context(StartServerSnafu)?; + self.services.start_all().await.context(StartServerSnafu)?; Ok(()) } pub async fn shutdown(&self) -> Result<(), crate::Error> { - self.server.shutdown().await.context(ShutdownServerSnafu)?; + self.services + .shutdown_all() + .await + .context(ShutdownServerSnafu)?; + + self.flownode_server.stop_workers().await?; if let Some(task) = &self.heartbeat_task { task.shutdown(); } - self.http_server - .shutdown() - .await - .context(ShutdownServerSnafu)?; - Ok(()) } + pub fn flownode_server(&self) -> &FlownodeServer { + &self.flownode_server + } + pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef { - self.server.flow_service.manager.clone() + self.flownode_server.inner.flow_service.manager.clone() + } + + pub fn setup_services(&mut self, services: ServerHandlers) { + self.services = services; } } @@ -325,21 +344,11 @@ impl FlownodeBuilder { let server = FlownodeServer::new(FlowService::new(manager.clone())); - let http_addr = self.opts.http.addr.parse().context(ParseAddrSnafu { - addr: self.opts.http.addr.clone(), - })?; - let http_server = HttpServerBuilder::new(self.opts.http) - .with_metrics_handler(MetricsHandler) - .build(); - let heartbeat_task = self.heartbeat_task; - let addr = self.opts.grpc.bind_addr; let instance = FlownodeInstance { - server, - addr: addr.parse().context(ParseAddrSnafu { addr })?, - http_server, - http_addr, + flownode_server: server, + services: ServerHandlers::new(), heartbeat_task, }; Ok(instance) @@ -475,6 +484,60 @@ impl FlownodeBuilder { } } +/// Useful in distributed mode +pub struct FlownodeServiceBuilder<'a> { + opts: &'a FlownodeOptions, + grpc_server: Option, + enable_http_service: bool, +} + +impl<'a> FlownodeServiceBuilder<'a> { + pub fn new(opts: &'a FlownodeOptions) -> Self { + Self { + opts, + grpc_server: None, + enable_http_service: false, + } + } + + pub fn enable_http_service(self) -> Self { + Self { + enable_http_service: true, + ..self + } + } + + pub fn with_grpc_server(self, grpc_server: FlownodeServer) -> Self { + Self { + grpc_server: Some(grpc_server), + ..self + } + } + + pub async fn build(mut self) -> Result { + let handlers = ServerHandlers::default(); + if let Some(grpc_server) = self.grpc_server.take() { + let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu { + addr: &self.opts.grpc.bind_addr, + })?; + let handler: ServerHandler = (Box::new(grpc_server), addr); + handlers.insert(handler).await; + } + + if self.enable_http_service { + let http_server = HttpServerBuilder::new(self.opts.http.clone()) + .with_metrics_handler(MetricsHandler) + .build(); + let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu { + addr: &self.opts.http.addr, + })?; + let handler: ServerHandler = (Box::new(http_server), addr); + handlers.insert(handler).await; + } + Ok(handlers) + } +} + #[derive(Clone)] pub struct FrontendInvoker { inserter: Arc,