From e6cc4df8c89eaf1f53ea626b69ca81dce0b21e16 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Mon, 5 Aug 2024 21:57:48 +0800 Subject: [PATCH] feat: flow recreate on reboot (#4509) * feat: flow reboot clean * refactor: per review * refactor: per review * test: sqlness flow reboot --- src/cmd/src/flownode.rs | 3 + src/cmd/src/standalone.rs | 3 +- src/flow/src/error.rs | 11 ++- src/flow/src/server.rs | 95 ++++++++++++++++++- tests-integration/src/standalone.rs | 1 + .../standalone/common/flow/flow_basic.result | 1 + .../standalone/common/flow/flow_basic.sql | 1 + 7 files changed, 110 insertions(+), 5 deletions(-) diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 328693f326..60ec6c6614 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -24,6 +24,7 @@ use common_grpc::channel_manager::ChannelConfig; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_telemetry::info; use common_telemetry::logging::TracingOptions; @@ -296,11 +297,13 @@ impl StartCommand { Arc::new(executor), ); + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); let flownode_builder = FlownodeBuilder::new( opts, Plugins::new(), table_metadata_manager, catalog_manager.clone(), + flow_metadata_manager, ) .with_heartbeat_task(heartbeat_task); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index ba25ab555f..efa360713f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -476,11 +476,13 @@ impl StartCommand { .await .context(StartDatanodeSnafu)?; + let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let flow_builder = FlownodeBuilder::new( Default::default(), plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), + flow_metadata_manager.clone(), ); let flownode = Arc::new( flow_builder @@ -511,7 +513,6 @@ impl StartCommand { opts.wal.into(), kv_backend.clone(), )); - let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let table_meta_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 3b8877ed86..8b4f3adc65 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -83,6 +83,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to list flows in flownode={id:?}"))] + ListFlows { + id: Option, + source: common_meta::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Flow already exist, id={id}"))] FlowAlreadyExist { id: FlowId, @@ -214,7 +222,8 @@ impl ErrorExt for Error { } Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } - | Self::FlowNotFound { .. } => StatusCode::TableNotFound, + | Self::FlowNotFound { .. } + | Self::ListFlows { .. } => StatusCode::TableNotFound, Self::InvalidQueryProst { .. } | &Self::InvalidQuery { .. } | &Self::Plan { .. } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index a8c850349f..d470eb0ad8 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -29,12 +29,13 @@ use common_meta::cache::{ }; use common_meta::ddl::{table_meta, ProcedureExecutorRef}; use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::flow::FlowMetadataManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::{self, Flownode, NodeManagerRef}; use common_query::Output; use common_telemetry::tracing::info; -use futures::FutureExt; +use futures::{FutureExt, StreamExt, TryStreamExt}; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use meta_client::client::MetaClient; @@ -47,7 +48,7 @@ use serde::de::Unexpected; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::heartbeat_options::HeartbeatOptions; use servers::server::Server; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; @@ -57,7 +58,8 @@ use tonic::{Request, Response, Status}; use crate::adapter::FlowWorkerManagerRef; use crate::error::{ - CacheRequiredSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu, + ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::transform::register_function_to_query_engine; @@ -240,6 +242,7 @@ pub struct FlownodeBuilder { plugins: Plugins, table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, + flow_metadata_manager: FlowMetadataManagerRef, heartbeat_task: Option, } @@ -250,12 +253,14 @@ impl FlownodeBuilder { plugins: Plugins, table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, + flow_metadata_manager: FlowMetadataManagerRef, ) -> Self { Self { opts, plugins, table_meta, catalog_manager, + flow_metadata_manager, heartbeat_task: None, } } @@ -283,6 +288,11 @@ impl FlownodeBuilder { self.build_manager(query_engine_factory.query_engine()) .await?, ); + + if let Err(err) = self.recover_flows(&manager).await { + common_telemetry::error!(err; "Failed to recover flows"); + } + let server = FlownodeServer::new(FlowService::new(manager.clone())); let heartbeat_task = self.heartbeat_task; @@ -296,6 +306,85 @@ impl FlownodeBuilder { Ok(instance) } + /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) + /// + /// or recover all existing flow tasks if in standalone mode(nodeid is None) + /// + /// TODO(discord9): persisent flow tasks with internal state + async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result { + let nodeid = self.opts.node_id; + let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid { + let to_be_recover = self + .flow_metadata_manager + .flownode_flow_manager() + .flows(nodeid) + .try_collect::>() + .await + .context(ListFlowsSnafu { id: Some(nodeid) })?; + to_be_recover.into_iter().map(|(id, _)| id).collect() + } else { + let all_catalogs = self + .catalog_manager + .catalog_names() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let mut all_flow_ids = vec![]; + for catalog in all_catalogs { + let flows = self + .flow_metadata_manager + .flow_name_manager() + .flow_names(&catalog) + .await + .try_collect::>() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); + } + all_flow_ids + }; + let cnt = to_be_recovered.len(); + + // TODO(discord9): recover in parallel + for flow_id in to_be_recovered { + let info = self + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .context(FlowNotFoundSnafu { id: flow_id })?; + + let sink_table_name = [ + info.sink_table_name().catalog_name.clone(), + info.sink_table_name().schema_name.clone(), + info.sink_table_name().table_name.clone(), + ]; + manager + .create_flow( + flow_id as _, + sink_table_name, + info.source_table_ids(), + true, + info.expire_after(), + Some(info.comment().clone()), + info.raw_sql().clone(), + info.options().clone(), + Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().clone()) + .build(), + ), + ) + .await?; + } + + Ok(cnt) + } + /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, /// nor does it actually start running the worker. async fn build_manager( diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9de4e59498..9f7188568f 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -156,6 +156,7 @@ impl GreptimeDbStandaloneBuilder { plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), + flow_metadata_manager.clone(), ); let flownode = Arc::new(flow_builder.build().await.unwrap()); diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index e5983ea8d4..3c49535b0f 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -24,6 +24,7 @@ select flush_flow('test_numbers_basic')<=1; | true | +----------------------------------------------------+ +-- SQLNESS ARG restart=true INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"), diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index dab3d78f83..1f282cca2e 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -14,6 +14,7 @@ SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '202 -- because flush_flow result is at most 1 select flush_flow('test_numbers_basic')<=1; +-- SQLNESS ARG restart=true INSERT INTO numbers_input_basic VALUES (20, "2021-07-01 00:00:00.200"),