1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use query::{QueryEngine, QueryEngineFactory};
44use servers::add_service;
45use servers::grpc::builder::GrpcServerBuilder;
46use servers::grpc::{GrpcServer, GrpcServerConfig};
47use servers::http::HttpServerBuilder;
48use servers::metrics_handler::MetricsHandler;
49use servers::server::{ServerHandler, ServerHandlers};
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use tokio::sync::{Mutex, broadcast, oneshot};
53use tonic::codec::CompressionEncoding;
54use tonic::{Request, Response, Status};
55
56use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
57use crate::adapter::{FlowStreamingEngineRef, create_worker};
58use crate::batching_mode::engine::BatchingEngine;
59use crate::error::{
60 CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
61 ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
62};
63use crate::heartbeat::HeartbeatTask;
64use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
65use crate::transform::register_function_to_query_engine;
66use crate::utils::{SizeReportSender, StateReportHandler};
67use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
68
69pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
70#[derive(Clone)]
72pub struct FlowService {
73 pub dual_engine: FlowDualEngineRef,
74}
75
76impl FlowService {
77 pub fn new(manager: FlowDualEngineRef) -> Self {
78 Self {
79 dual_engine: manager,
80 }
81 }
82}
83
84#[async_trait::async_trait]
85impl flow_server::Flow for FlowService {
86 async fn handle_create_remove(
87 &self,
88 request: Request<FlowRequest>,
89 ) -> Result<Response<FlowResponse>, Status> {
90 let _timer = METRIC_FLOW_PROCESSING_TIME
91 .with_label_values(&["ddl"])
92 .start_timer();
93
94 let request = request.into_inner();
95 self.dual_engine
96 .handle(request)
97 .await
98 .map_err(|err| {
99 common_telemetry::error!(err; "Failed to handle flow request");
100 err
101 })
102 .map(Response::new)
103 .map_err(to_status_with_last_err)
104 }
105
106 async fn handle_mirror_request(
107 &self,
108 request: Request<InsertRequests>,
109 ) -> Result<Response<FlowResponse>, Status> {
110 let _timer = METRIC_FLOW_PROCESSING_TIME
111 .with_label_values(&["insert"])
112 .start_timer();
113
114 let request = request.into_inner();
115 let mut row_count = 0;
117 let request = api::v1::region::InsertRequests {
118 requests: request
119 .requests
120 .into_iter()
121 .map(|insert| {
122 insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
123 api::v1::region::InsertRequest {
124 region_id: insert.region_id,
125 rows: insert.rows,
126 partition_expr_version: insert.partition_expr_version,
127 }
128 })
129 .collect_vec(),
130 };
131
132 METRIC_FLOW_ROWS
133 .with_label_values(&["in"])
134 .inc_by(row_count as u64);
135
136 self.dual_engine
137 .handle_inserts(request)
138 .await
139 .map(Response::new)
140 .map_err(to_status_with_last_err)
141 }
142
143 async fn handle_mark_dirty_time_window(
144 &self,
145 reqs: Request<DirtyWindowRequests>,
146 ) -> Result<Response<FlowResponse>, Status> {
147 self.dual_engine
148 .handle_mark_window_dirty(reqs.into_inner())
149 .await
150 .map(Response::new)
151 .map_err(to_status_with_last_err)
152 }
153}
154
155#[derive(Clone)]
156pub struct FlownodeServer {
157 inner: Arc<FlownodeServerInner>,
158}
159
160struct FlownodeServerInner {
164 worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
166 server_shutdown_tx: Mutex<broadcast::Sender<()>>,
168 streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
170 state_report_task_handler: Mutex<Option<JoinHandle<()>>>,
172 flow_service: FlowService,
173}
174
175impl FlownodeServer {
176 pub fn new(flow_service: FlowService) -> Self {
177 let (tx, _rx) = broadcast::channel::<()>(1);
178 let (server_tx, _server_rx) = broadcast::channel::<()>(1);
179 Self {
180 inner: Arc::new(FlownodeServerInner {
181 flow_service,
182 worker_shutdown_tx: Mutex::new(tx),
183 server_shutdown_tx: Mutex::new(server_tx),
184 streaming_task_handler: Mutex::new(None),
185 state_report_task_handler: Mutex::new(None),
186 }),
187 }
188 }
189
190 async fn start_workers(&self) -> Result<(), Error> {
194 let manager_ref = self.inner.flow_service.dual_engine.clone();
195 let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
196 if state_report_task_handler.is_none() {
197 *state_report_task_handler = manager_ref.clone().start_state_report_task().await;
198 }
199 drop(state_report_task_handler);
200 let handle = manager_ref
201 .streaming_engine()
202 .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
203 self.inner
204 .streaming_task_handler
205 .lock()
206 .await
207 .replace(handle);
208
209 self.inner
210 .flow_service
211 .dual_engine
212 .start_flow_consistent_check_task()
213 .await?;
214
215 Ok(())
216 }
217
218 async fn stop_workers(&self) -> Result<(), Error> {
220 let tx = self.inner.worker_shutdown_tx.lock().await;
221 if tx.send(()).is_err() {
222 info!("Receiver dropped, the flow node server has already shutdown");
223 }
224 self.inner
227 .flow_service
228 .dual_engine
229 .stop_flow_consistent_check_task()
230 .await?;
231 Ok(())
232 }
233}
234
235impl FlownodeServer {
236 pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
237 flow_server::FlowServer::new(self.inner.flow_service.clone())
238 .accept_compressed(CompressionEncoding::Gzip)
239 .send_compressed(CompressionEncoding::Gzip)
240 .accept_compressed(CompressionEncoding::Zstd)
241 .send_compressed(CompressionEncoding::Zstd)
242 }
243}
244
245pub struct FlownodeInstance {
247 flownode_server: FlownodeServer,
248 services: ServerHandlers,
249 heartbeat_task: Option<HeartbeatTask>,
250}
251
252impl FlownodeInstance {
253 pub async fn start(&mut self) -> Result<(), crate::Error> {
254 if let Some(task) = &self.heartbeat_task {
255 task.start().await?;
256 }
257
258 self.flownode_server.start_workers().await?;
259
260 self.services.start_all().await.context(StartServerSnafu)?;
261
262 Ok(())
263 }
264 pub async fn shutdown(&mut self) -> Result<(), Error> {
265 self.services
266 .shutdown_all()
267 .await
268 .context(ShutdownServerSnafu)?;
269
270 self.flownode_server.stop_workers().await?;
271
272 if let Some(task) = &self.heartbeat_task {
273 task.shutdown();
274 }
275
276 Ok(())
277 }
278
279 pub fn flownode_server(&self) -> &FlownodeServer {
280 &self.flownode_server
281 }
282
283 pub fn flow_engine(&self) -> FlowDualEngineRef {
284 self.flownode_server.inner.flow_service.dual_engine.clone()
285 }
286
287 pub fn setup_services(&mut self, services: ServerHandlers) {
288 self.services = services;
289 }
290}
291
292pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
293 if let Some(user_provider) = fn_opts.user_provider.as_ref() {
294 let static_provider = auth::static_user_provider_from_option(user_provider)
295 .context(IllegalAuthConfigSnafu)?;
296
297 let (usr, pwd) = static_provider
298 .get_one_user_pwd()
299 .context(IllegalAuthConfigSnafu)?;
300 let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
301 return Ok(Some(auth_header));
302 }
303
304 Ok(None)
305}
306
307pub struct FlownodeBuilder {
309 opts: FlownodeOptions,
310 plugins: Plugins,
311 table_meta: TableMetadataManagerRef,
312 catalog_manager: CatalogManagerRef,
313 flow_metadata_manager: FlowMetadataManagerRef,
314 heartbeat_task: Option<HeartbeatTask>,
315 state_report_handler: Option<StateReportHandler>,
317 frontend_client: Arc<FrontendClient>,
318}
319
320impl FlownodeBuilder {
321 pub fn new(
323 opts: FlownodeOptions,
324 plugins: Plugins,
325 table_meta: TableMetadataManagerRef,
326 catalog_manager: CatalogManagerRef,
327 flow_metadata_manager: FlowMetadataManagerRef,
328 frontend_client: Arc<FrontendClient>,
329 ) -> Self {
330 Self {
331 opts,
332 plugins,
333 table_meta,
334 catalog_manager,
335 flow_metadata_manager,
336 heartbeat_task: None,
337 state_report_handler: None,
338 frontend_client,
339 }
340 }
341
342 pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
343 let (sender, receiver) = SizeReportSender::new();
344 Self {
345 heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
346 state_report_handler: Some(receiver),
347 ..self
348 }
349 }
350
351 pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
352 let query_engine_factory = QueryEngineFactory::new_with_plugins(
354 self.catalog_manager.clone(),
356 None,
357 None,
358 None,
359 None,
360 None,
361 false,
362 Default::default(),
363 self.opts.query.clone(),
364 );
365 let manager = Arc::new(
366 self.build_manager(query_engine_factory.query_engine())
367 .await?,
368 );
369 let batching = Arc::new(BatchingEngine::new(
370 self.frontend_client.clone(),
371 query_engine_factory.query_engine(),
372 self.flow_metadata_manager.clone(),
373 self.table_meta.clone(),
374 self.catalog_manager.clone(),
375 self.opts.flow.batching_mode.clone(),
376 ));
377 let dual = Arc::new(FlowDualEngine::new(
378 manager.clone(),
379 batching,
380 self.flow_metadata_manager.clone(),
381 self.catalog_manager.clone(),
382 self.plugins.clone(),
383 ));
384 if let Some(handler) = self.state_report_handler.take() {
385 dual.set_state_report_handler(handler).await;
386 }
387
388 let server = FlownodeServer::new(FlowService::new(dual));
389
390 let heartbeat_task = self.heartbeat_task;
391
392 let instance = FlownodeInstance {
393 flownode_server: server,
394 services: ServerHandlers::default(),
395 heartbeat_task,
396 };
397 Ok(instance)
398 }
399
400 async fn build_manager(
403 &mut self,
404 query_engine: Arc<dyn QueryEngine>,
405 ) -> Result<StreamingEngine, Error> {
406 let table_meta = self.table_meta.clone();
407
408 register_function_to_query_engine(&query_engine);
409
410 let num_workers = self.opts.flow.num_workers;
411
412 let node_id = self.opts.node_id.map(|id| id as u32);
413
414 let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
415 for worker_id in 0..num_workers {
416 let (tx, rx) = oneshot::channel();
417
418 let _handle = std::thread::Builder::new()
419 .name(format!("flow-worker-{}", worker_id))
420 .spawn(move || {
421 let (handle, mut worker) = create_worker();
422 let _ = tx.send(handle);
423 info!("Flow Worker started in new thread");
424 worker.run();
425 });
426 let worker_handle = rx.await.map_err(|e| {
427 UnexpectedSnafu {
428 reason: format!("Failed to receive worker handle: {}", e),
429 }
430 .build()
431 })?;
432 man.add_worker_handle(worker_handle);
433 }
434 info!("Flow Node Manager started");
435 Ok(man)
436 }
437}
438
439pub struct FlownodeServiceBuilder<'a> {
441 opts: &'a FlownodeOptions,
442 grpc_server: Option<GrpcServer>,
443 enable_http_service: bool,
444}
445
446impl<'a> FlownodeServiceBuilder<'a> {
447 pub fn new(opts: &'a FlownodeOptions) -> Self {
448 Self {
449 opts,
450 grpc_server: None,
451 enable_http_service: false,
452 }
453 }
454
455 pub fn enable_http_service(self) -> Self {
456 Self {
457 enable_http_service: true,
458 ..self
459 }
460 }
461
462 pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
463 Self {
464 grpc_server: Some(grpc_server),
465 ..self
466 }
467 }
468
469 pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
470 let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
471 self.grpc_server = Some(grpc_server);
472 self
473 }
474
475 pub fn build(mut self) -> Result<ServerHandlers, Error> {
476 let handlers = ServerHandlers::default();
477 if let Some(grpc_server) = self.grpc_server.take() {
478 let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
479 addr: &self.opts.grpc.bind_addr,
480 })?;
481 let handler: ServerHandler = (Box::new(grpc_server), addr);
482 handlers.insert(handler);
483 }
484
485 if self.enable_http_service {
486 let http_server = HttpServerBuilder::new(self.opts.http.clone())
487 .with_metrics_handler(MetricsHandler)
488 .build();
489 let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
490 addr: &self.opts.http.addr,
491 })?;
492 let handler: ServerHandler = (Box::new(http_server), addr);
493 handlers.insert(handler);
494 }
495 Ok(handlers)
496 }
497
498 pub fn grpc_server_builder(
499 opts: &FlownodeOptions,
500 flownode_server: &FlownodeServer,
501 ) -> GrpcServerBuilder {
502 let config = GrpcServerConfig {
503 max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
504 max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
505 tls: opts.grpc.tls.clone(),
506 max_connection_age: opts.grpc.max_connection_age,
507 };
508 let service = flownode_server.create_flow_service();
509 let runtime = common_runtime::global_runtime();
510 let mut builder = GrpcServerBuilder::new(config, runtime);
511 add_service!(builder, service);
512 builder
513 }
514}
515
516#[derive(Clone)]
521pub struct FrontendInvoker {
522 inserter: Arc<Inserter>,
523 deleter: Arc<Deleter>,
524 statement_executor: Arc<StatementExecutor>,
525}
526
527impl FrontendInvoker {
528 pub fn new(
529 inserter: Arc<Inserter>,
530 deleter: Arc<Deleter>,
531 statement_executor: Arc<StatementExecutor>,
532 ) -> Self {
533 Self {
534 inserter,
535 deleter,
536 statement_executor,
537 }
538 }
539
540 pub async fn build_from(
541 flow_streaming_engine: FlowStreamingEngineRef,
542 catalog_manager: CatalogManagerRef,
543 kv_backend: KvBackendRef,
544 layered_cache_registry: LayeredCacheRegistryRef,
545 procedure_executor: ProcedureExecutorRef,
546 node_manager: NodeManagerRef,
547 origin_frontend_addr: String,
548 ) -> Result<FrontendInvoker, Error> {
549 let table_route_cache: TableRouteCacheRef =
550 layered_cache_registry.get().context(CacheRequiredSnafu {
551 name: TABLE_ROUTE_CACHE_NAME,
552 })?;
553 let partition_info_cache: PartitionInfoCacheRef =
554 layered_cache_registry.get().context(CacheRequiredSnafu {
555 name: PARTITION_INFO_CACHE_NAME,
556 })?;
557
558 let partition_manager = Arc::new(PartitionRuleManager::new(
559 kv_backend.clone(),
560 table_route_cache.clone(),
561 partition_info_cache.clone(),
562 ));
563
564 let table_flownode_cache: TableFlownodeSetCacheRef =
565 layered_cache_registry.get().context(CacheRequiredSnafu {
566 name: TABLE_FLOWNODE_SET_CACHE_NAME,
567 })?;
568
569 let inserter = Arc::new(Inserter::new(
570 catalog_manager.clone(),
571 partition_manager.clone(),
572 node_manager.clone(),
573 table_flownode_cache,
574 ));
575
576 let deleter = Arc::new(Deleter::new(
577 catalog_manager.clone(),
578 partition_manager.clone(),
579 node_manager.clone(),
580 ));
581
582 let query_engine = flow_streaming_engine.query_engine.clone();
583
584 let statement_executor = Arc::new(StatementExecutor::new(
585 catalog_manager.clone(),
586 query_engine.clone(),
587 procedure_executor.clone(),
588 kv_backend.clone(),
589 layered_cache_registry.clone(),
590 inserter.clone(),
591 partition_manager,
592 None,
593 origin_frontend_addr,
594 ));
595
596 let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
597 Ok(invoker)
598 }
599}
600
601impl FrontendInvoker {
602 pub async fn row_inserts(
603 &self,
604 requests: RowInsertRequests,
605 ctx: QueryContextRef,
606 ) -> common_frontend::error::Result<Output> {
607 let _timer = METRIC_FLOW_PROCESSING_TIME
608 .with_label_values(&["output_insert"])
609 .start_timer();
610
611 self.inserter
612 .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
613 .await
614 .map_err(BoxedError::new)
615 .context(common_frontend::error::ExternalSnafu)
616 }
617
618 pub async fn row_deletes(
619 &self,
620 requests: RowDeleteRequests,
621 ctx: QueryContextRef,
622 ) -> common_frontend::error::Result<Output> {
623 let _timer = METRIC_FLOW_PROCESSING_TIME
624 .with_label_values(&["output_delete"])
625 .start_timer();
626
627 self.deleter
628 .handle_row_deletes(requests, ctx)
629 .await
630 .map_err(BoxedError::new)
631 .context(common_frontend::error::ExternalSnafu)
632 }
633
634 pub fn statement_executor(&self) -> Arc<StatementExecutor> {
635 self.statement_executor.clone()
636 }
637}
638
639pub(crate) async fn get_all_flow_ids(
641 flow_metadata_manager: &FlowMetadataManagerRef,
642 catalog_manager: &CatalogManagerRef,
643 nodeid: Option<u64>,
644) -> Result<Vec<u32>, Error> {
645 let ret = if let Some(nodeid) = nodeid {
646 let flow_ids_one_node = flow_metadata_manager
647 .flownode_flow_manager()
648 .flows(nodeid)
649 .try_collect::<Vec<_>>()
650 .await
651 .context(ListFlowsSnafu { id: Some(nodeid) })?;
652 flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
653 } else {
654 let all_catalogs = catalog_manager
655 .catalog_names()
656 .await
657 .map_err(BoxedError::new)
658 .context(ExternalSnafu)?;
659 let mut all_flow_ids = vec![];
660 for catalog in all_catalogs {
661 let flows = flow_metadata_manager
662 .flow_name_manager()
663 .flow_names(&catalog)
664 .await
665 .try_collect::<Vec<_>>()
666 .await
667 .map_err(BoxedError::new)
668 .context(ExternalSnafu)?;
669
670 all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
671 }
672 all_flow_ids
673 };
674
675 Ok(ret)
676}
677
678#[cfg(test)]
679mod tests {
680 use std::sync::Arc;
681 use std::time::Duration;
682
683 use catalog::memory::new_memory_catalog_manager;
684 use common_base::Plugins;
685 use common_meta::key::TableMetadataManager;
686 use common_meta::key::flow::FlowMetadataManager;
687 use common_meta::kv_backend::memory::MemoryKvBackend;
688 use query::options::QueryOptions;
689
690 use super::*;
691 use crate::adapter::flownode_impl::FlowDualEngine;
692 use crate::batching_mode::BatchingModeOptions;
693 use crate::batching_mode::engine::BatchingEngine;
694 use crate::utils::SizeReportSender;
695
696 async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
697 let kv_backend = Arc::new(MemoryKvBackend::new());
698 let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
699 table_meta.init().await.unwrap();
700 let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
701 let catalog_manager = new_memory_catalog_manager().unwrap();
702 let query_engine = crate::test_utils::create_test_query_engine();
703
704 let streaming_engine = Arc::new(StreamingEngine::new(
705 None,
706 query_engine.clone(),
707 table_meta.clone(),
708 ));
709 let (frontend_client, _handler) =
710 FrontendClient::from_empty_grpc_handler(QueryOptions::default());
711 let batching_engine = Arc::new(BatchingEngine::new(
712 Arc::new(frontend_client),
713 query_engine,
714 flow_meta.clone(),
715 table_meta,
716 catalog_manager.clone(),
717 BatchingModeOptions::default(),
718 ));
719 let dual_engine = Arc::new(FlowDualEngine::new(
720 streaming_engine,
721 batching_engine,
722 flow_meta,
723 catalog_manager,
724 Plugins::new(),
725 ));
726
727 let (report_sender, report_handler) = SizeReportSender::new();
728 dual_engine.set_state_report_handler(report_handler).await;
729
730 let server = FlownodeServer::new(FlowService::new(dual_engine));
731 (server, report_sender)
732 }
733
734 #[tokio::test]
735 async fn test_state_report_handler_survives_worker_restart() {
736 let (server, report_sender) = new_test_flownode_server().await;
737
738 server.start_workers().await.unwrap();
739 report_sender.query(Duration::from_secs(3)).await.unwrap();
740
741 server.stop_workers().await.unwrap();
742 report_sender.query(Duration::from_secs(3)).await.unwrap();
743
744 server.start_workers().await.unwrap();
745 report_sender.query(Duration::from_secs(3)).await.unwrap();
746
747 server.stop_workers().await.unwrap();
748 }
749}