Skip to main content

flow/
server.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Implementation of grpc service for flow node
16
17use std::net::SocketAddr;
18use std::sync::Arc;
19
20use api::v1::flow::DirtyWindowRequests;
21use api::v1::{RowDeleteRequests, RowInsertRequests};
22use cache::{PARTITION_INFO_CACHE_NAME, TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
23use catalog::CatalogManagerRef;
24use common_base::Plugins;
25use common_error::ext::BoxedError;
26use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
27use common_meta::key::TableMetadataManagerRef;
28use common_meta::key::flow::FlowMetadataManagerRef;
29use common_meta::kv_backend::KvBackendRef;
30use common_meta::node_manager::{Flownode, NodeManagerRef};
31use common_meta::procedure_executor::ProcedureExecutorRef;
32use common_query::Output;
33use common_runtime::JoinHandle;
34use common_telemetry::tracing::info;
35use futures::TryStreamExt;
36use greptime_proto::v1::flow::{FlowRequest, FlowResponse, InsertRequests, flow_server};
37use itertools::Itertools;
38use operator::delete::Deleter;
39use operator::insert::Inserter;
40use operator::statement::StatementExecutor;
41use partition::cache::PartitionInfoCacheRef;
42use partition::manager::PartitionRuleManager;
43use query::{QueryEngine, QueryEngineFactory};
44use servers::add_service;
45use servers::grpc::builder::GrpcServerBuilder;
46use servers::grpc::{GrpcServer, GrpcServerConfig};
47use servers::http::HttpServerBuilder;
48use servers::metrics_handler::MetricsHandler;
49use servers::server::{ServerHandler, ServerHandlers};
50use session::context::QueryContextRef;
51use snafu::{OptionExt, ResultExt};
52use tokio::sync::{Mutex, broadcast, oneshot};
53use tonic::codec::CompressionEncoding;
54use tonic::{Request, Response, Status};
55
56use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
57use crate::adapter::{FlowStreamingEngineRef, create_worker};
58use crate::batching_mode::engine::BatchingEngine;
59use crate::error::{
60    CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
61    ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
62};
63use crate::heartbeat::HeartbeatTask;
64use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
65use crate::transform::register_function_to_query_engine;
66use crate::utils::{SizeReportSender, StateReportHandler};
67use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
68
69pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
70/// wrapping flow node manager to avoid orphan rule with Arc<...>
71#[derive(Clone)]
72pub struct FlowService {
73    pub dual_engine: FlowDualEngineRef,
74}
75
76impl FlowService {
77    pub fn new(manager: FlowDualEngineRef) -> Self {
78        Self {
79            dual_engine: manager,
80        }
81    }
82}
83
84#[async_trait::async_trait]
85impl flow_server::Flow for FlowService {
86    async fn handle_create_remove(
87        &self,
88        request: Request<FlowRequest>,
89    ) -> Result<Response<FlowResponse>, Status> {
90        let _timer = METRIC_FLOW_PROCESSING_TIME
91            .with_label_values(&["ddl"])
92            .start_timer();
93
94        let request = request.into_inner();
95        self.dual_engine
96            .handle(request)
97            .await
98            .map_err(|err| {
99                common_telemetry::error!(err; "Failed to handle flow request");
100                err
101            })
102            .map(Response::new)
103            .map_err(to_status_with_last_err)
104    }
105
106    async fn handle_mirror_request(
107        &self,
108        request: Request<InsertRequests>,
109    ) -> Result<Response<FlowResponse>, Status> {
110        let _timer = METRIC_FLOW_PROCESSING_TIME
111            .with_label_values(&["insert"])
112            .start_timer();
113
114        let request = request.into_inner();
115        // TODO(discord9): fix protobuf import order shenanigans to remove this duplicated define
116        let mut row_count = 0;
117        let request = api::v1::region::InsertRequests {
118            requests: request
119                .requests
120                .into_iter()
121                .map(|insert| {
122                    insert.rows.as_ref().inspect(|x| row_count += x.rows.len());
123                    api::v1::region::InsertRequest {
124                        region_id: insert.region_id,
125                        rows: insert.rows,
126                        partition_expr_version: insert.partition_expr_version,
127                    }
128                })
129                .collect_vec(),
130        };
131
132        METRIC_FLOW_ROWS
133            .with_label_values(&["in"])
134            .inc_by(row_count as u64);
135
136        self.dual_engine
137            .handle_inserts(request)
138            .await
139            .map(Response::new)
140            .map_err(to_status_with_last_err)
141    }
142
143    async fn handle_mark_dirty_time_window(
144        &self,
145        reqs: Request<DirtyWindowRequests>,
146    ) -> Result<Response<FlowResponse>, Status> {
147        self.dual_engine
148            .handle_mark_window_dirty(reqs.into_inner())
149            .await
150            .map(Response::new)
151            .map_err(to_status_with_last_err)
152    }
153}
154
155#[derive(Clone)]
156pub struct FlownodeServer {
157    inner: Arc<FlownodeServerInner>,
158}
159
160/// FlownodeServerInner is the inner state of FlownodeServer,
161/// this struct mostly useful for construct/start and stop the
162/// flow node server
163struct FlownodeServerInner {
164    /// worker shutdown signal, not to be confused with server_shutdown_tx
165    worker_shutdown_tx: Mutex<broadcast::Sender<()>>,
166    /// server shutdown signal for shutdown grpc server
167    server_shutdown_tx: Mutex<broadcast::Sender<()>>,
168    /// streaming task handler
169    streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
170    /// state report task handler
171    state_report_task_handler: Mutex<Option<JoinHandle<()>>>,
172    flow_service: FlowService,
173}
174
175impl FlownodeServer {
176    pub fn new(flow_service: FlowService) -> Self {
177        let (tx, _rx) = broadcast::channel::<()>(1);
178        let (server_tx, _server_rx) = broadcast::channel::<()>(1);
179        Self {
180            inner: Arc::new(FlownodeServerInner {
181                flow_service,
182                worker_shutdown_tx: Mutex::new(tx),
183                server_shutdown_tx: Mutex::new(server_tx),
184                streaming_task_handler: Mutex::new(None),
185                state_report_task_handler: Mutex::new(None),
186            }),
187        }
188    }
189
190    /// Start the background task for streaming computation.
191    ///
192    /// Should be called only after heartbeat is establish, hence can get cluster info
193    async fn start_workers(&self) -> Result<(), Error> {
194        let manager_ref = self.inner.flow_service.dual_engine.clone();
195        let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
196        if state_report_task_handler.is_none() {
197            *state_report_task_handler = manager_ref.clone().start_state_report_task().await;
198        }
199        drop(state_report_task_handler);
200        let handle = manager_ref
201            .streaming_engine()
202            .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
203        self.inner
204            .streaming_task_handler
205            .lock()
206            .await
207            .replace(handle);
208
209        self.inner
210            .flow_service
211            .dual_engine
212            .start_flow_consistent_check_task()
213            .await?;
214
215        Ok(())
216    }
217
218    /// Stop the background task for streaming computation.
219    async fn stop_workers(&self) -> Result<(), Error> {
220        let tx = self.inner.worker_shutdown_tx.lock().await;
221        if tx.send(()).is_err() {
222            info!("Receiver dropped, the flow node server has already shutdown");
223        }
224        // Keep state_report_task_handler alive across worker restarts.
225        // Dropping it here would permanently lose the report channel receiver.
226        self.inner
227            .flow_service
228            .dual_engine
229            .stop_flow_consistent_check_task()
230            .await?;
231        Ok(())
232    }
233}
234
235impl FlownodeServer {
236    pub fn create_flow_service(&self) -> flow_server::FlowServer<impl flow_server::Flow> {
237        flow_server::FlowServer::new(self.inner.flow_service.clone())
238            .accept_compressed(CompressionEncoding::Gzip)
239            .send_compressed(CompressionEncoding::Gzip)
240            .accept_compressed(CompressionEncoding::Zstd)
241            .send_compressed(CompressionEncoding::Zstd)
242    }
243}
244
245/// The flownode server instance.
246pub struct FlownodeInstance {
247    flownode_server: FlownodeServer,
248    services: ServerHandlers,
249    heartbeat_task: Option<HeartbeatTask>,
250}
251
252impl FlownodeInstance {
253    pub async fn start(&mut self) -> Result<(), crate::Error> {
254        if let Some(task) = &self.heartbeat_task {
255            task.start().await?;
256        }
257
258        self.flownode_server.start_workers().await?;
259
260        self.services.start_all().await.context(StartServerSnafu)?;
261
262        Ok(())
263    }
264    pub async fn shutdown(&mut self) -> Result<(), Error> {
265        self.services
266            .shutdown_all()
267            .await
268            .context(ShutdownServerSnafu)?;
269
270        self.flownode_server.stop_workers().await?;
271
272        if let Some(task) = &self.heartbeat_task {
273            task.shutdown();
274        }
275
276        Ok(())
277    }
278
279    pub fn flownode_server(&self) -> &FlownodeServer {
280        &self.flownode_server
281    }
282
283    pub fn flow_engine(&self) -> FlowDualEngineRef {
284        self.flownode_server.inner.flow_service.dual_engine.clone()
285    }
286
287    pub fn setup_services(&mut self, services: ServerHandlers) {
288        self.services = services;
289    }
290}
291
292pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
293    if let Some(user_provider) = fn_opts.user_provider.as_ref() {
294        let static_provider = auth::static_user_provider_from_option(user_provider)
295            .context(IllegalAuthConfigSnafu)?;
296
297        let (usr, pwd) = static_provider
298            .get_one_user_pwd()
299            .context(IllegalAuthConfigSnafu)?;
300        let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
301        return Ok(Some(auth_header));
302    }
303
304    Ok(None)
305}
306
307/// [`FlownodeInstance`] Builder
308pub struct FlownodeBuilder {
309    opts: FlownodeOptions,
310    plugins: Plugins,
311    table_meta: TableMetadataManagerRef,
312    catalog_manager: CatalogManagerRef,
313    flow_metadata_manager: FlowMetadataManagerRef,
314    heartbeat_task: Option<HeartbeatTask>,
315    /// receive a oneshot sender to send state size report
316    state_report_handler: Option<StateReportHandler>,
317    frontend_client: Arc<FrontendClient>,
318}
319
320impl FlownodeBuilder {
321    /// init flownode builder
322    pub fn new(
323        opts: FlownodeOptions,
324        plugins: Plugins,
325        table_meta: TableMetadataManagerRef,
326        catalog_manager: CatalogManagerRef,
327        flow_metadata_manager: FlowMetadataManagerRef,
328        frontend_client: Arc<FrontendClient>,
329    ) -> Self {
330        Self {
331            opts,
332            plugins,
333            table_meta,
334            catalog_manager,
335            flow_metadata_manager,
336            heartbeat_task: None,
337            state_report_handler: None,
338            frontend_client,
339        }
340    }
341
342    pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self {
343        let (sender, receiver) = SizeReportSender::new();
344        Self {
345            heartbeat_task: Some(heartbeat_task.with_query_stat_size(sender)),
346            state_report_handler: Some(receiver),
347            ..self
348        }
349    }
350
351    pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
352        // TODO(discord9): does this query engine need those?
353        let query_engine_factory = QueryEngineFactory::new_with_plugins(
354            // query engine in flownode is only used for translate plan with resolved table source.
355            self.catalog_manager.clone(),
356            None,
357            None,
358            None,
359            None,
360            None,
361            false,
362            Default::default(),
363            self.opts.query.clone(),
364        );
365        let manager = Arc::new(
366            self.build_manager(query_engine_factory.query_engine())
367                .await?,
368        );
369        let batching = Arc::new(BatchingEngine::new(
370            self.frontend_client.clone(),
371            query_engine_factory.query_engine(),
372            self.flow_metadata_manager.clone(),
373            self.table_meta.clone(),
374            self.catalog_manager.clone(),
375            self.opts.flow.batching_mode.clone(),
376        ));
377        let dual = Arc::new(FlowDualEngine::new(
378            manager.clone(),
379            batching,
380            self.flow_metadata_manager.clone(),
381            self.catalog_manager.clone(),
382            self.plugins.clone(),
383        ));
384        if let Some(handler) = self.state_report_handler.take() {
385            dual.set_state_report_handler(handler).await;
386        }
387
388        let server = FlownodeServer::new(FlowService::new(dual));
389
390        let heartbeat_task = self.heartbeat_task;
391
392        let instance = FlownodeInstance {
393            flownode_server: server,
394            services: ServerHandlers::default(),
395            heartbeat_task,
396        };
397        Ok(instance)
398    }
399
400    /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
401    /// nor does it actually start running the worker.
402    async fn build_manager(
403        &mut self,
404        query_engine: Arc<dyn QueryEngine>,
405    ) -> Result<StreamingEngine, Error> {
406        let table_meta = self.table_meta.clone();
407
408        register_function_to_query_engine(&query_engine);
409
410        let num_workers = self.opts.flow.num_workers;
411
412        let node_id = self.opts.node_id.map(|id| id as u32);
413
414        let mut man = StreamingEngine::new(node_id, query_engine, table_meta);
415        for worker_id in 0..num_workers {
416            let (tx, rx) = oneshot::channel();
417
418            let _handle = std::thread::Builder::new()
419                .name(format!("flow-worker-{}", worker_id))
420                .spawn(move || {
421                    let (handle, mut worker) = create_worker();
422                    let _ = tx.send(handle);
423                    info!("Flow Worker started in new thread");
424                    worker.run();
425                });
426            let worker_handle = rx.await.map_err(|e| {
427                UnexpectedSnafu {
428                    reason: format!("Failed to receive worker handle: {}", e),
429                }
430                .build()
431            })?;
432            man.add_worker_handle(worker_handle);
433        }
434        info!("Flow Node Manager started");
435        Ok(man)
436    }
437}
438
439/// Useful in distributed mode
440pub struct FlownodeServiceBuilder<'a> {
441    opts: &'a FlownodeOptions,
442    grpc_server: Option<GrpcServer>,
443    enable_http_service: bool,
444}
445
446impl<'a> FlownodeServiceBuilder<'a> {
447    pub fn new(opts: &'a FlownodeOptions) -> Self {
448        Self {
449            opts,
450            grpc_server: None,
451            enable_http_service: false,
452        }
453    }
454
455    pub fn enable_http_service(self) -> Self {
456        Self {
457            enable_http_service: true,
458            ..self
459        }
460    }
461
462    pub fn with_grpc_server(self, grpc_server: GrpcServer) -> Self {
463        Self {
464            grpc_server: Some(grpc_server),
465            ..self
466        }
467    }
468
469    pub fn with_default_grpc_server(mut self, flownode_server: &FlownodeServer) -> Self {
470        let grpc_server = Self::grpc_server_builder(self.opts, flownode_server).build();
471        self.grpc_server = Some(grpc_server);
472        self
473    }
474
475    pub fn build(mut self) -> Result<ServerHandlers, Error> {
476        let handlers = ServerHandlers::default();
477        if let Some(grpc_server) = self.grpc_server.take() {
478            let addr: SocketAddr = self.opts.grpc.bind_addr.parse().context(ParseAddrSnafu {
479                addr: &self.opts.grpc.bind_addr,
480            })?;
481            let handler: ServerHandler = (Box::new(grpc_server), addr);
482            handlers.insert(handler);
483        }
484
485        if self.enable_http_service {
486            let http_server = HttpServerBuilder::new(self.opts.http.clone())
487                .with_metrics_handler(MetricsHandler)
488                .build();
489            let addr: SocketAddr = self.opts.http.addr.parse().context(ParseAddrSnafu {
490                addr: &self.opts.http.addr,
491            })?;
492            let handler: ServerHandler = (Box::new(http_server), addr);
493            handlers.insert(handler);
494        }
495        Ok(handlers)
496    }
497
498    pub fn grpc_server_builder(
499        opts: &FlownodeOptions,
500        flownode_server: &FlownodeServer,
501    ) -> GrpcServerBuilder {
502        let config = GrpcServerConfig {
503            max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
504            max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
505            tls: opts.grpc.tls.clone(),
506            max_connection_age: opts.grpc.max_connection_age,
507        };
508        let service = flownode_server.create_flow_service();
509        let runtime = common_runtime::global_runtime();
510        let mut builder = GrpcServerBuilder::new(config, runtime);
511        add_service!(builder, service);
512        builder
513    }
514}
515
516/// Basically a tiny frontend that communicates with datanode, different from [`FrontendClient`] which
517/// connect to a real frontend instead, this is used for flow's streaming engine. And is for simple query.
518///
519/// For heavy query use [`FrontendClient`] which offload computation to frontend, lifting the load from flownode
520#[derive(Clone)]
521pub struct FrontendInvoker {
522    inserter: Arc<Inserter>,
523    deleter: Arc<Deleter>,
524    statement_executor: Arc<StatementExecutor>,
525}
526
527impl FrontendInvoker {
528    pub fn new(
529        inserter: Arc<Inserter>,
530        deleter: Arc<Deleter>,
531        statement_executor: Arc<StatementExecutor>,
532    ) -> Self {
533        Self {
534            inserter,
535            deleter,
536            statement_executor,
537        }
538    }
539
540    pub async fn build_from(
541        flow_streaming_engine: FlowStreamingEngineRef,
542        catalog_manager: CatalogManagerRef,
543        kv_backend: KvBackendRef,
544        layered_cache_registry: LayeredCacheRegistryRef,
545        procedure_executor: ProcedureExecutorRef,
546        node_manager: NodeManagerRef,
547        origin_frontend_addr: String,
548    ) -> Result<FrontendInvoker, Error> {
549        let table_route_cache: TableRouteCacheRef =
550            layered_cache_registry.get().context(CacheRequiredSnafu {
551                name: TABLE_ROUTE_CACHE_NAME,
552            })?;
553        let partition_info_cache: PartitionInfoCacheRef =
554            layered_cache_registry.get().context(CacheRequiredSnafu {
555                name: PARTITION_INFO_CACHE_NAME,
556            })?;
557
558        let partition_manager = Arc::new(PartitionRuleManager::new(
559            kv_backend.clone(),
560            table_route_cache.clone(),
561            partition_info_cache.clone(),
562        ));
563
564        let table_flownode_cache: TableFlownodeSetCacheRef =
565            layered_cache_registry.get().context(CacheRequiredSnafu {
566                name: TABLE_FLOWNODE_SET_CACHE_NAME,
567            })?;
568
569        let inserter = Arc::new(Inserter::new(
570            catalog_manager.clone(),
571            partition_manager.clone(),
572            node_manager.clone(),
573            table_flownode_cache,
574        ));
575
576        let deleter = Arc::new(Deleter::new(
577            catalog_manager.clone(),
578            partition_manager.clone(),
579            node_manager.clone(),
580        ));
581
582        let query_engine = flow_streaming_engine.query_engine.clone();
583
584        let statement_executor = Arc::new(StatementExecutor::new(
585            catalog_manager.clone(),
586            query_engine.clone(),
587            procedure_executor.clone(),
588            kv_backend.clone(),
589            layered_cache_registry.clone(),
590            inserter.clone(),
591            partition_manager,
592            None,
593            origin_frontend_addr,
594        ));
595
596        let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);
597        Ok(invoker)
598    }
599}
600
601impl FrontendInvoker {
602    pub async fn row_inserts(
603        &self,
604        requests: RowInsertRequests,
605        ctx: QueryContextRef,
606    ) -> common_frontend::error::Result<Output> {
607        let _timer = METRIC_FLOW_PROCESSING_TIME
608            .with_label_values(&["output_insert"])
609            .start_timer();
610
611        self.inserter
612            .handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
613            .await
614            .map_err(BoxedError::new)
615            .context(common_frontend::error::ExternalSnafu)
616    }
617
618    pub async fn row_deletes(
619        &self,
620        requests: RowDeleteRequests,
621        ctx: QueryContextRef,
622    ) -> common_frontend::error::Result<Output> {
623        let _timer = METRIC_FLOW_PROCESSING_TIME
624            .with_label_values(&["output_delete"])
625            .start_timer();
626
627        self.deleter
628            .handle_row_deletes(requests, ctx)
629            .await
630            .map_err(BoxedError::new)
631            .context(common_frontend::error::ExternalSnafu)
632    }
633
634    pub fn statement_executor(&self) -> Arc<StatementExecutor> {
635        self.statement_executor.clone()
636    }
637}
638
639/// get all flow ids in this flownode
640pub(crate) async fn get_all_flow_ids(
641    flow_metadata_manager: &FlowMetadataManagerRef,
642    catalog_manager: &CatalogManagerRef,
643    nodeid: Option<u64>,
644) -> Result<Vec<u32>, Error> {
645    let ret = if let Some(nodeid) = nodeid {
646        let flow_ids_one_node = flow_metadata_manager
647            .flownode_flow_manager()
648            .flows(nodeid)
649            .try_collect::<Vec<_>>()
650            .await
651            .context(ListFlowsSnafu { id: Some(nodeid) })?;
652        flow_ids_one_node.into_iter().map(|(id, _)| id).collect()
653    } else {
654        let all_catalogs = catalog_manager
655            .catalog_names()
656            .await
657            .map_err(BoxedError::new)
658            .context(ExternalSnafu)?;
659        let mut all_flow_ids = vec![];
660        for catalog in all_catalogs {
661            let flows = flow_metadata_manager
662                .flow_name_manager()
663                .flow_names(&catalog)
664                .await
665                .try_collect::<Vec<_>>()
666                .await
667                .map_err(BoxedError::new)
668                .context(ExternalSnafu)?;
669
670            all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
671        }
672        all_flow_ids
673    };
674
675    Ok(ret)
676}
677
678#[cfg(test)]
679mod tests {
680    use std::sync::Arc;
681    use std::time::Duration;
682
683    use catalog::memory::new_memory_catalog_manager;
684    use common_base::Plugins;
685    use common_meta::key::TableMetadataManager;
686    use common_meta::key::flow::FlowMetadataManager;
687    use common_meta::kv_backend::memory::MemoryKvBackend;
688    use query::options::QueryOptions;
689
690    use super::*;
691    use crate::adapter::flownode_impl::FlowDualEngine;
692    use crate::batching_mode::BatchingModeOptions;
693    use crate::batching_mode::engine::BatchingEngine;
694    use crate::utils::SizeReportSender;
695
696    async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
697        let kv_backend = Arc::new(MemoryKvBackend::new());
698        let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
699        table_meta.init().await.unwrap();
700        let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
701        let catalog_manager = new_memory_catalog_manager().unwrap();
702        let query_engine = crate::test_utils::create_test_query_engine();
703
704        let streaming_engine = Arc::new(StreamingEngine::new(
705            None,
706            query_engine.clone(),
707            table_meta.clone(),
708        ));
709        let (frontend_client, _handler) =
710            FrontendClient::from_empty_grpc_handler(QueryOptions::default());
711        let batching_engine = Arc::new(BatchingEngine::new(
712            Arc::new(frontend_client),
713            query_engine,
714            flow_meta.clone(),
715            table_meta,
716            catalog_manager.clone(),
717            BatchingModeOptions::default(),
718        ));
719        let dual_engine = Arc::new(FlowDualEngine::new(
720            streaming_engine,
721            batching_engine,
722            flow_meta,
723            catalog_manager,
724            Plugins::new(),
725        ));
726
727        let (report_sender, report_handler) = SizeReportSender::new();
728        dual_engine.set_state_report_handler(report_handler).await;
729
730        let server = FlownodeServer::new(FlowService::new(dual_engine));
731        (server, report_sender)
732    }
733
734    #[tokio::test]
735    async fn test_state_report_handler_survives_worker_restart() {
736        let (server, report_sender) = new_test_flownode_server().await;
737
738        server.start_workers().await.unwrap();
739        report_sender.query(Duration::from_secs(3)).await.unwrap();
740
741        server.stop_workers().await.unwrap();
742        report_sender.query(Duration::from_secs(3)).await.unwrap();
743
744        server.start_workers().await.unwrap();
745        report_sender.query(Duration::from_secs(3)).await.unwrap();
746
747        server.stop_workers().await.unwrap();
748    }
749}