From e9fec341ec1778b2b866a4fd116432741dd70a1f Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 11 Mar 2026 20:29:46 +0800 Subject: [PATCH] feat: pending flow Signed-off-by: discord9 --- .../meta/src/cache/flow/table_flownode.rs | 6 +- src/common/meta/src/ddl.rs | 1 + src/common/meta/src/ddl/activate_flow.rs | 449 ++++++++++++++++++ src/common/meta/src/ddl/create_flow.rs | 70 ++- .../meta/src/ddl/create_flow/metadata.rs | 28 +- src/common/meta/src/ddl/flow_meta.rs | 7 +- src/common/meta/src/ddl/tests.rs | 1 + .../meta/src/ddl/tests/activate_flow.rs | 158 ++++++ src/common/meta/src/ddl/tests/create_flow.rs | 102 +++- src/common/meta/src/ddl_manager.rs | 15 +- src/common/meta/src/key/flow.rs | 12 + src/common/meta/src/key/flow/flow_info.rs | 133 +++++- src/meta-srv/src/error.rs | 16 + src/meta-srv/src/flow.rs | 120 +++++ src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 5 + src/meta-srv/src/metasrv/builder.rs | 11 + src/operator/src/statement/ddl.rs | 19 +- 18 files changed, 1111 insertions(+), 43 deletions(-) create mode 100644 src/common/meta/src/ddl/activate_flow.rs create mode 100644 src/common/meta/src/ddl/tests/activate_flow.rs create mode 100644 src/meta-srv/src/flow.rs diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index a7777f3361..e0c01a9855 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -208,7 +208,7 @@ mod tests { use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::FlowMetadataManager; - use crate::key::flow::flow_info::FlowInfoValue; + use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; @@ -231,6 +231,8 @@ mod tests { 1024, FlowInfoValue { source_table_ids: vec![1024, 1025], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name: TableName { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), @@ -245,6 +247,8 @@ mod tests { eval_interval_secs: None, comment: "comment".to_string(), options: Default::default(), + status: FlowStatus::Active, + last_activation_error: None, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 8fc647433a..42fd8ffe76 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::region_registry::LeaderRegionRegistryRef; +pub mod activate_flow; pub mod allocator; pub mod alter_database; pub mod alter_logical_tables; diff --git a/src/common/meta/src/ddl/activate_flow.rs b/src/common/meta/src/ddl/activate_flow.rs new file mode 100644 index 0000000000..2f82f50f77 --- /dev/null +++ b/src/common/meta/src/ddl/activate_flow.rs @@ -0,0 +1,449 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use api::v1::ExpireAfter; +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_procedure::{ + Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, +}; +use common_telemetry::tracing_context::TracingContext; +use futures::future::join_all; +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; +use table::table_name::TableName; + +use crate::cache_invalidator::Context; +use crate::ddl::DdlContext; +use crate::ddl::create_flow::FlowType; +use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; +use crate::error::{self, Result}; +use crate::instruction::{CacheIdent, CreateFlow}; +use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; +use crate::key::flow::flow_route::FlowRouteValue; +use crate::key::table_name::TableNameKey; +use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; +use crate::lock_key::{CatalogLock, FlowLock}; +use crate::metrics; +use crate::peer::Peer; + +pub struct ActivatePendingFlowProcedure { + pub context: DdlContext, + pub data: ActivatePendingFlowData, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum ActivatePendingFlowState { + Prepare, + CreateFlows, + UpdateMetadata, + InvalidateFlowCache, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ActivatePendingFlowData { + pub(crate) state: ActivatePendingFlowState, + pub(crate) flow_id: FlowId, + pub(crate) catalog_name: String, + #[serde(default)] + pub(crate) peers: Vec, + #[serde(default)] + pub(crate) resolved_table_ids: Vec, + pub(crate) prev_flow_info_value: Option>, +} + +pub struct PendingFlowResolution { + resolved_table_ids: Vec, + unresolved_source_table_names: Vec, +} + +impl ActivatePendingFlowProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::ActivatePendingFlow"; + + pub fn new(flow_id: FlowId, catalog_name: String, context: DdlContext) -> Self { + Self { + context, + data: ActivatePendingFlowData { + state: ActivatePendingFlowState::Prepare, + flow_id, + catalog_name, + peers: vec![], + resolved_table_ids: vec![], + prev_flow_info_value: None, + }, + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + Ok(Self { context, data }) + } + + async fn on_prepare(&mut self) -> Result { + let Some(current_flow_info) = self + .context + .flow_metadata_manager + .flow_info_manager() + .get_raw(self.data.flow_id) + .await? + else { + return Ok(Status::done()); + }; + + if current_flow_info.get_inner_ref().is_active() { + return Ok(Status::done()); + } + + let resolution = + resolve_pending_flow_sources(&self.context, current_flow_info.get_inner_ref()).await?; + if !resolution.unresolved_source_table_names.is_empty() { + update_pending_flow_metadata( + &self.context, + self.data.flow_id, + ¤t_flow_info, + resolution.resolved_table_ids, + resolution.unresolved_source_table_names, + None, + ) + .await?; + return Ok(Status::done()); + } + + if let Some(reason) = + validate_batching_activation(&self.context, &resolution.resolved_table_ids).await? + { + update_pending_flow_metadata( + &self.context, + self.data.flow_id, + ¤t_flow_info, + resolution.resolved_table_ids, + vec![], + Some(reason), + ) + .await?; + return Ok(Status::done()); + } + + self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?; + self.data.resolved_table_ids = resolution.resolved_table_ids; + self.data.prev_flow_info_value = Some(current_flow_info); + self.data.state = ActivatePendingFlowState::CreateFlows; + + Ok(Status::executing(true)) + } + + async fn on_create_flows(&mut self) -> Result { + let flow_info = + self.data + .prev_flow_info_value + .as_ref() + .context(error::UnexpectedSnafu { + err_msg: "missing previous flow info for activation", + })?; + let request = build_create_request( + self.data.flow_id, + flow_info.get_inner_ref(), + &self.data.resolved_table_ids, + ); + create_flow_on_peers( + &self.context, + &self.data.peers, + request, + flow_info.get_inner_ref(), + ) + .await?; + self.data.state = ActivatePendingFlowState::UpdateMetadata; + Ok(Status::executing(true)) + } + + async fn on_update_metadata(&mut self) -> Result { + let current_flow_info = + self.data + .prev_flow_info_value + .as_ref() + .context(error::UnexpectedSnafu { + err_msg: "missing previous flow info for metadata update", + })?; + let (new_flow_info, flow_routes) = build_active_flow_info( + current_flow_info.get_inner_ref(), + &self.data.peers, + &self.data.resolved_table_ids, + ); + self.context + .flow_metadata_manager + .update_flow_metadata( + self.data.flow_id, + current_flow_info, + &new_flow_info, + flow_routes, + ) + .await?; + self.data.state = ActivatePendingFlowState::InvalidateFlowCache; + Ok(Status::executing(true)) + } + + async fn on_broadcast(&mut self) -> Result { + invalidate_flow_cache( + &self.context, + self.data.flow_id, + &self.data.resolved_table_ids, + &self.data.peers, + ) + .await?; + Ok(Status::done_with_output(self.data.flow_id)) + } +} + +#[async_trait] +impl Procedure for ActivatePendingFlowProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.data.state; + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW + .with_label_values(&[match state { + ActivatePendingFlowState::Prepare => "activate_prepare", + ActivatePendingFlowState::CreateFlows => "activate_create_flows", + ActivatePendingFlowState::UpdateMetadata => "activate_update_metadata", + ActivatePendingFlowState::InvalidateFlowCache => "activate_invalidate_flow_cache", + }]) + .start_timer(); + + match self.data.state { + ActivatePendingFlowState::Prepare => self.on_prepare().await, + ActivatePendingFlowState::CreateFlows => self.on_create_flows().await, + ActivatePendingFlowState::UpdateMetadata => self.on_update_metadata().await, + ActivatePendingFlowState::InvalidateFlowCache => self.on_broadcast().await, + } + .map_err(map_to_procedure_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + LockKey::new(vec![ + CatalogLock::Read(&self.data.catalog_name).into(), + FlowLock::Write(self.data.flow_id).into(), + ]) + } +} + +async fn resolve_pending_flow_sources( + context: &DdlContext, + flow_info: &FlowInfoValue, +) -> Result { + let keys = flow_info + .all_source_table_names() + .iter() + .map(|name| TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name)) + .collect::>(); + let resolved_tables = context + .table_metadata_manager + .table_name_manager() + .batch_get(keys) + .await?; + + let mut resolved_table_ids = Vec::with_capacity(flow_info.all_source_table_names().len()); + let mut unresolved_source_table_names = Vec::new(); + for (name, table_id) in flow_info + .all_source_table_names() + .iter() + .zip(resolved_tables.into_iter()) + { + match table_id { + Some(table_id) => resolved_table_ids.push(table_id.table_id()), + None => unresolved_source_table_names.push(name.clone()), + } + } + + Ok(PendingFlowResolution { + resolved_table_ids, + unresolved_source_table_names, + }) +} + +async fn update_pending_flow_metadata( + context: &DdlContext, + flow_id: FlowId, + current_flow_info: &DeserializedValueWithBytes, + resolved_table_ids: Vec, + unresolved_source_table_names: Vec, + last_activation_error: Option, +) -> Result<()> { + let mut new_flow_info = current_flow_info.get_inner_ref().clone(); + new_flow_info.source_table_ids = resolved_table_ids; + new_flow_info.unresolved_source_table_names = unresolved_source_table_names; + new_flow_info.last_activation_error = last_activation_error; + new_flow_info.updated_time = chrono::Utc::now(); + context + .flow_metadata_manager + .update_flow_metadata(flow_id, current_flow_info, &new_flow_info, vec![]) + .await +} + +async fn validate_batching_activation( + context: &DdlContext, + resolved_table_ids: &[TableId], +) -> Result> { + let table_infos = context + .table_metadata_manager + .table_info_manager() + .batch_get(resolved_table_ids) + .await?; + for source_table_id in resolved_table_ids { + let Some(table_info) = table_infos.get(source_table_id) else { + return Ok(Some(format!( + "Source table metadata is not ready yet, table_id={source_table_id}", + ))); + }; + if table_info.table_info.meta.options.ttl == Some(common_time::TimeToLive::Instant) { + return Ok(Some(format!( + "Source table {} requires streaming activation because ttl=instant", + table_info.table_name() + ))); + } + } + Ok(None) +} + +fn build_create_request( + flow_id: FlowId, + flow_info: &FlowInfoValue, + resolved_table_ids: &[TableId], +) -> CreateRequest { + let mut flow_options = flow_info.options.clone(); + flow_options.insert( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::Batching.to_string(), + ); + CreateRequest { + flow_id: Some(api::v1::FlowId { id: flow_id }), + source_table_ids: resolved_table_ids + .iter() + .map(|table_id| api::v1::TableId { id: *table_id }) + .collect_vec(), + sink_table_name: Some(flow_info.sink_table_name.clone().into()), + create_if_not_exists: true, + or_replace: false, + expire_after: flow_info.expire_after.map(|value| ExpireAfter { value }), + eval_interval: flow_info + .eval_interval_secs + .map(|seconds| api::v1::EvalInterval { seconds }), + comment: flow_info.comment.clone(), + sql: flow_info.raw_sql.clone(), + flow_options, + } +} + +async fn create_flow_on_peers( + context: &DdlContext, + peers: &[Peer], + request: CreateRequest, + flow_info: &FlowInfoValue, +) -> Result<()> { + let query_context = flow_info.query_context.clone().unwrap_or_default(); + let mut create_flow_tasks = Vec::with_capacity(peers.len()); + for peer in peers { + let requester = context.node_manager.flownode(peer).await; + let request = FlowRequest { + header: Some(FlowRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + query_context: Some(query_context.clone().into()), + }), + body: Some(PbFlowRequest::Create(request.clone())), + }; + create_flow_tasks.push(async move { + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(peer.clone())) + }); + } + join_all(create_flow_tasks) + .await + .into_iter() + .collect::>>()?; + Ok(()) +} + +fn build_active_flow_info( + current_flow_info: &FlowInfoValue, + peers: &[Peer], + resolved_table_ids: &[TableId], +) -> (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) { + let flownode_ids = peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.id)) + .collect::>(); + let flow_routes = peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() })) + .collect::>(); + + let mut new_flow_info = current_flow_info.clone(); + new_flow_info.source_table_ids = resolved_table_ids.to_vec(); + new_flow_info.unresolved_source_table_names = vec![]; + new_flow_info.flownode_ids = flownode_ids; + new_flow_info.status = FlowStatus::Active; + new_flow_info.last_activation_error = None; + new_flow_info.options.insert( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::Batching.to_string(), + ); + new_flow_info.updated_time = chrono::Utc::now(); + + (new_flow_info, flow_routes) +} + +async fn invalidate_flow_cache( + context: &DdlContext, + flow_id: FlowId, + resolved_table_ids: &[TableId], + peers: &[Peer], +) -> Result<()> { + let ctx = Context { + subject: Some("Invalidate flow cache by activating pending flow".to_string()), + }; + let flow_part2peers = peers + .iter() + .enumerate() + .map(|(idx, peer)| (idx as u32, peer.clone())) + .collect(); + context + .cache_invalidator + .invalidate( + &ctx, + &[ + CacheIdent::CreateFlow(CreateFlow { + flow_id, + source_table_ids: resolved_table_ids.to_vec(), + partition_to_peer_mapping: flow_part2peers, + }), + CacheIdent::FlowId(flow_id), + ], + ) + .await +} diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 7120e50425..69b869faa9 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -34,13 +34,14 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, ensure}; use strum::AsRefStr; use table::metadata::TableId; +use table::table_name::TableName; use crate::cache_invalidator::Context; use crate::ddl::DdlContext; use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::error::{self, Result, UnexpectedSnafu}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; -use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; @@ -67,11 +68,13 @@ impl CreateFlowProcedure { flow_id: None, peers: vec![], source_table_ids: vec![], + unresolved_source_table_names: vec![], flow_context: query_context.into(), // Convert to FlowQueryContext state: CreateFlowState::Prepare, prev_flow_info_value: None, did_replace: false, flow_type: None, + last_activation_error: None, }, } } @@ -189,15 +192,32 @@ impl CreateFlowProcedure { if self.data.flow_id.is_none() { self.allocate_flow_id().await?; } - self.data.state = CreateFlowState::CreateFlows; - // determine flow type self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?); + if self.data.is_pending() { + self.data.peers.clear(); + self.data.state = CreateFlowState::CreateMetadata; + } else { + if self.data.peers.is_empty() { + self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?; + } + self.data.state = CreateFlowState::CreateFlows; + } + Ok(Status::executing(true)) } async fn on_flownode_create_flows(&mut self) -> Result { - // Safety: must be allocated. + ensure!( + !self.data.peers.is_empty(), + UnexpectedSnafu { + err_msg: format!( + "Flow {:?} enters CreateFlows with no peers allocated", + self.data.flow_id + ), + } + ); + let mut create_flow = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; @@ -281,20 +301,20 @@ impl CreateFlowProcedure { })]); } - let (_flow_info, flow_routes) = (&self.data).into(); - let flow_part2peers = flow_routes - .into_iter() - .map(|(part_id, route)| (part_id, route.peer)) - .collect(); + if self.data.is_active() { + let (_flow_info, flow_routes) = (&self.data).into(); + let flow_part2peers = flow_routes + .into_iter() + .map(|(part_id, route)| (part_id, route.peer)) + .collect(); - caches.extend([ - CacheIdent::CreateFlow(CreateFlow { + caches.push(CacheIdent::CreateFlow(CreateFlow { flow_id, source_table_ids: self.data.source_table_ids.clone(), partition_to_peer_mapping: flow_part2peers, - }), - CacheIdent::FlowId(flow_id), - ]); + })); + } + caches.push(CacheIdent::FlowId(flow_id)); self.context .cache_invalidator @@ -411,6 +431,8 @@ pub struct CreateFlowData { pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, + #[serde(default)] + pub(crate) unresolved_source_table_names: Vec, /// Use alias for backward compatibility with QueryContext serialized data #[serde(alias = "query_context")] pub(crate) flow_context: FlowQueryContext, @@ -422,6 +444,18 @@ pub struct CreateFlowData { #[serde(default)] pub(crate) did_replace: bool, pub(crate) flow_type: Option, + #[serde(default)] + pub(crate) last_activation_error: Option, +} + +impl CreateFlowData { + pub(crate) fn is_pending(&self) -> bool { + !self.unresolved_source_table_names.is_empty() + } + + pub(crate) fn is_active(&self) -> bool { + !self.is_pending() + } } impl From<&CreateFlowData> for CreateRequest { @@ -495,6 +529,8 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa let flow_info: FlowInfoValue = FlowInfoValue { source_table_ids: value.source_table_ids.clone(), + all_source_table_names: value.task.source_table_names.clone(), + unresolved_source_table_names: value.unresolved_source_table_names.clone(), sink_table_name, flownode_ids, catalog_name, @@ -506,6 +542,12 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa eval_interval_secs: eval_interval, comment, options, + status: if value.is_pending() { + FlowStatus::PendingSources + } else { + FlowStatus::Active + }, + last_activation_error: value.last_activation_error.clone(), created_time: create_time, updated_time: chrono::Utc::now(), }; diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index 27b85b7946..56d291c318 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use snafu::OptionExt; - use crate::ddl::create_flow::CreateFlowProcedure; -use crate::error::{self, Result}; +use crate::error::Result; use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { @@ -36,7 +34,6 @@ impl CreateFlowProcedure { /// Ensures all source tables exist and collects source table ids pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { - // Ensures all source tables exist. let keys = self .data .task @@ -52,22 +49,25 @@ impl CreateFlowProcedure { .batch_get(keys) .await?; - let source_table_ids = self + let mut resolved = Vec::with_capacity(self.data.task.source_table_names.len()); + let mut unresolved = Vec::new(); + + for (name, table_id) in self .data .task .source_table_names .iter() .zip(source_table_ids) - .map(|(name, table_id)| { - Ok(table_id - .with_context(|| error::TableNotFoundSnafu { - table_name: name.to_string(), - })? - .table_id()) - }) - .collect::>>()?; + { + match table_id { + Some(table_id) => resolved.push(table_id.table_id()), + None => unresolved.push(name.clone()), + } + } - self.data.source_table_ids = source_table_ids; + self.data.source_table_ids = resolved; + self.data.unresolved_source_table_names = unresolved; + self.data.last_activation_error = None; Ok(()) } } diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 85c1f3e989..fd41690b26 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -59,8 +59,13 @@ impl FlowMetadataAllocator { /// Allocates the [FlowId] and [Peer]s. pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; - let peers = self.peer_allocator.alloc(partitions).await?; + let peers = self.alloc_peers(partitions).await?; Ok((flow_id, peers)) } + + pub async fn alloc_peers(&self, partitions: usize) -> Result> { + let peers = self.peer_allocator.alloc(partitions).await?; + Ok(peers) + } } diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 0700259cf8..a09283205b 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod activate_flow; mod alter_logical_tables; mod alter_table; mod create_flow; diff --git a/src/common/meta/src/ddl/tests/activate_flow.rs b/src/common/meta/src/ddl/tests/activate_flow.rs new file mode 100644 index 0000000000..9d0a514f26 --- /dev/null +++ b/src/common/meta/src/ddl/tests/activate_flow.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure_test::execute_procedure_until_done; +use common_time::TimeToLive; +use table::table_name::TableName; + +use crate::ddl::activate_flow::ActivatePendingFlowProcedure; +use crate::ddl::test_util::create_table::test_create_table_task; +use crate::ddl::tests::create_flow::create_test_flow; +use crate::key::table_route::TableRouteValue; +use crate::test_util::{MockFlownodeManager, new_ddl_context}; + +#[tokio::test] +async fn test_activate_pending_flow() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_source_table", + )]; + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new( + crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler, + )); + let ddl_context = new_ddl_context(node_manager); + + let flow_id = create_test_flow( + &ddl_context, + "activate_pending_flow", + source_table_names.clone(), + sink_table_name, + ) + .await; + + let pending_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(pending_flow.is_pending()); + + let create_table_task = test_create_table_task("activate_source_table", 1024); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + Default::default(), + ) + .await + .unwrap(); + + let mut procedure = ActivatePendingFlowProcedure::new( + flow_id, + DEFAULT_CATALOG_NAME.to_string(), + ddl_context.clone(), + ); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let activated_flow_id = output.downcast_ref::().unwrap(); + assert_eq!(*activated_flow_id, flow_id); + + let activated_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(activated_flow.is_active()); + assert_eq!(activated_flow.unresolved_source_table_names().len(), 0); + assert_eq!(activated_flow.source_table_ids(), &[1024]); + assert_eq!(activated_flow.last_activation_error(), &None); + assert!(!activated_flow.flownode_ids().is_empty()); +} + +#[tokio::test] +async fn test_activate_pending_flow_require_streaming_keeps_pending() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "instant_ttl_source_table", + )]; + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "instant_ttl_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new( + crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler, + )); + let ddl_context = new_ddl_context(node_manager); + + let flow_id = create_test_flow( + &ddl_context, + "instant_ttl_pending_flow", + source_table_names.clone(), + sink_table_name, + ) + .await; + + let mut create_table_task = test_create_table_task("instant_ttl_source_table", 1025); + create_table_task.table_info.meta.options.ttl = Some(TimeToLive::Instant); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + Default::default(), + ) + .await + .unwrap(); + + let mut procedure = ActivatePendingFlowProcedure::new( + flow_id, + DEFAULT_CATALOG_NAME.to_string(), + ddl_context.clone(), + ); + assert!(execute_procedure_until_done(&mut procedure).await.is_none()); + + let pending_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(pending_flow.is_pending()); + assert_eq!(pending_flow.unresolved_source_table_names().len(), 0); + assert_eq!(pending_flow.source_table_ids(), &[1025]); + assert!(pending_flow.flownode_ids().is_empty()); + assert!( + pending_flow + .last_activation_error() + .as_ref() + .unwrap() + .contains("requires streaming activation") + ); +} diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 8803f98e0d..0e5b40c888 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure::Status; use common_procedure_test::execute_procedure_until_done; use table::table_name::TableName; @@ -74,9 +75,23 @@ async fn test_create_flow_source_table_not_found() { let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); let ddl_context = new_ddl_context(node_manager); let query_ctx = test_query_context(); - let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); - let err = procedure.on_prepare().await.unwrap_err(); - assert_matches!(err, error::Error::TableNotFound { .. }); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context.clone()); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true, .. }); + assert_eq!(procedure.data.unresolved_source_table_names.len(), 1); + assert_eq!(procedure.data.source_table_ids, Vec::::new()); + + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = *output.downcast_ref::().unwrap(); + let flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(flow_info.is_pending()); + assert_eq!(flow_info.unresolved_source_table_names().len(), 1); } pub(crate) async fn create_test_flow( @@ -152,6 +167,83 @@ async fn test_create_flow() { assert_matches!(err, error::Error::FlowAlreadyExists { .. }); } +#[tokio::test] +async fn test_replace_pending_flow_activates_with_allocated_peers() { + let source_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let pending_flow_id = create_test_flow( + &ddl_context, + "replace_pending_flow", + vec![source_table_name.clone()], + sink_table_name.clone(), + ) + .await; + + let pending_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(pending_flow_id) + .await + .unwrap() + .unwrap(); + assert!(pending_flow.is_pending()); + assert!(pending_flow.flownode_ids().is_empty()); + + let create_table_task = test_create_table_task("replace_pending_source_table", 1026); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut replace_task = test_create_flow_task( + "replace_pending_flow", + vec![source_table_name], + sink_table_name, + false, + ); + replace_task.or_replace = true; + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = *output.downcast_ref::().unwrap(); + assert_eq!(flow_id, pending_flow_id); + + let replaced_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(replaced_flow.is_active()); + assert_eq!(replaced_flow.source_table_ids(), &[1026]); + assert!(!replaced_flow.flownode_ids().is_empty()); + + let routes = ddl_context + .flow_metadata_manager + .flow_route_manager() + .routes(flow_id) + .await + .unwrap(); + assert!(!routes.is_empty()); +} + #[tokio::test] async fn test_create_flow_same_source_and_sink_table() { let table_id = 1024; @@ -259,10 +351,12 @@ fn test_create_flow_data_new_format_serialization() { flow_id: None, peers: vec![], source_table_ids: vec![], + unresolved_source_table_names: vec![], flow_context, prev_flow_info_value: None, did_replace: false, flow_type: None, + last_activation_error: None, }; let serialized = serde_json::to_string(&data).unwrap(); @@ -309,10 +403,12 @@ fn test_flow_info_conversion_with_flow_context() { flow_id: Some(123), peers: vec![], source_table_ids: vec![456, 789], + unresolved_source_table_names: vec![], flow_context, prev_flow_info_value: None, did_replace: false, flow_type: Some(FlowType::Batching), + last_activation_error: None, }; let (flow_info, _routes) = (&data).into(); diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 0106add32f..ad5887bbf1 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt, ensure}; use store_api::storage::TableId; use table::table_name::TableName; +use crate::ddl::activate_flow::ActivatePendingFlowProcedure; use crate::ddl::alter_database::AlterDatabaseProcedure; use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure; use crate::ddl::alter_table::AlterTableProcedure; @@ -52,7 +53,7 @@ use crate::error::{ }; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; -use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::key::{DeserializedValueWithBytes, FlowId, TableMetadataManagerRef}; use crate::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] use crate::rpc::ddl::DdlTask::CreateTrigger; @@ -232,6 +233,7 @@ impl DdlManager { CreateTableProcedure, CreateLogicalTablesProcedure, CreateViewProcedure, + ActivatePendingFlowProcedure, CreateFlowProcedure, AlterTableProcedure, AlterLogicalTablesProcedure, @@ -489,6 +491,17 @@ impl DdlManager { self.execute_procedure_and_wait(procedure_with_id).await } + pub async fn submit_activate_pending_flow_task( + &self, + flow_id: FlowId, + catalog_name: String, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + let procedure = ActivatePendingFlowProcedure::new(flow_id, catalog_name, context); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + self.execute_procedure_and_wait(procedure_with_id).await + } + /// Submits and executes a drop flow task. #[tracing::instrument(skip_all)] pub async fn submit_drop_flow_task( diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 546071f2a0..0b369254fb 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -460,6 +460,8 @@ mod tests { query_context: None, flow_name: flow_name.to_string(), source_table_ids, + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name, flownode_ids, raw_sql: "raw".to_string(), @@ -467,6 +469,8 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: flow_info::FlowStatus::Active, + last_activation_error: None, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), } @@ -635,6 +639,8 @@ mod tests { query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), @@ -642,6 +648,8 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: flow_info::FlowStatus::Active, + last_activation_error: None, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }; @@ -1012,6 +1020,8 @@ mod tests { query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), @@ -1019,6 +1029,8 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: flow_info::FlowStatus::Active, + last_activation_error: None, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }; diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 7b277e36ef..ff59e5311a 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -16,6 +16,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use chrono::{DateTime, Utc}; +use futures::TryStreamExt; +use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -27,12 +29,24 @@ use crate::FlownodeId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; +use crate::key::{ + BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue, +}; use crate::kv_backend::KvBackendRef; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream}; +use crate::rpc::KeyValue; +use crate::rpc::store::RangeRequest; const FLOW_INFO_KEY_PREFIX: &str = "info"; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum FlowStatus { + PendingSources, + #[default] + Active, +} + lazy_static! { static ref FLOW_INFO_KEY_PATTERN: Regex = Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); @@ -114,10 +128,16 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowInfoValue { /// The source tables used by the flow. + #[serde(default)] pub source_table_ids: Vec, + #[serde(default)] + pub all_source_table_names: Vec, + #[serde(default)] + pub unresolved_source_table_names: Vec, /// The sink table used by the flow. pub sink_table_name: TableName, /// Which flow nodes this flow is running on. + #[serde(default)] pub flownode_ids: BTreeMap, /// The catalog name. pub catalog_name: String, @@ -145,6 +165,10 @@ pub struct FlowInfoValue { pub comment: String, /// The options. pub options: HashMap, + #[serde(default)] + pub status: FlowStatus, + #[serde(default)] + pub last_activation_error: Option, /// The created time #[serde(default)] pub created_time: DateTime, @@ -154,6 +178,14 @@ pub struct FlowInfoValue { } impl FlowInfoValue { + pub fn is_pending(&self) -> bool { + self.status == FlowStatus::PendingSources + } + + pub fn is_active(&self) -> bool { + self.status == FlowStatus::Active + } + /// Returns the `flownode_id`. pub fn flownode_ids(&self) -> &BTreeMap { &self.flownode_ids @@ -173,6 +205,14 @@ impl FlowInfoValue { &self.source_table_ids } + pub fn all_source_table_names(&self) -> &[TableName] { + &self.all_source_table_names + } + + pub fn unresolved_source_table_names(&self) -> &[TableName] { + &self.unresolved_source_table_names + } + pub fn catalog_name(&self) -> &String { &self.catalog_name } @@ -209,6 +249,14 @@ impl FlowInfoValue { &self.options } + pub fn status(&self) -> &FlowStatus { + &self.status + } + + pub fn last_activation_error(&self) -> &Option { + &self.last_activation_error + } + pub fn created_time(&self) -> &DateTime { &self.created_time } @@ -225,6 +273,12 @@ pub struct FlowInfoManager { kv_backend: KvBackendRef, } +pub fn flow_info_decoder(kv: KeyValue) -> Result<(FlowInfoKey, FlowInfoValue)> { + let key = FlowInfoKey::from_bytes(&kv.key)?; + let value = FlowInfoValue::try_from_raw_value(&kv.value)?; + Ok((key, value)) +} + impl FlowInfoManager { /// Returns a new [FlowInfoManager]. pub fn new(kv_backend: KvBackendRef) -> Self { @@ -254,6 +308,23 @@ impl FlowInfoManager { .transpose() } + pub fn flow_infos(&self) -> BoxStream<'static, Result<(FlowId, FlowInfoValue)>> { + let start_key = FlowScoped::new(BytesAdapter::from( + format!("{FLOW_INFO_KEY_PREFIX}/").into_bytes(), + )) + .to_bytes(); + let req = RangeRequest::new().with_prefix(start_key); + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + flow_info_decoder, + ) + .into_stream(); + + Box::pin(stream.map_ok(|(key, value)| (key.flow_id(), value))) + } + /// Builds a create flow transaction. /// It is expected that the `__flow/info/{flow_id}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. @@ -308,6 +379,12 @@ impl FlowInfoManager { #[cfg(test)] mod tests { + use std::collections::{BTreeMap, HashMap}; + + use chrono::Utc; + use serde::Serialize; + use table::table_name::TableName; + use super::*; #[test] @@ -322,4 +399,58 @@ mod tests { let key = FlowInfoKey::from_bytes(&bytes).unwrap(); assert_eq!(key.flow_id(), 2); } + + #[test] + fn test_flow_info_value_backward_compatibility() { + #[derive(Serialize)] + struct OldFlowInfoValue { + source_table_ids: Vec, + sink_table_name: TableName, + flownode_ids: BTreeMap, + catalog_name: String, + query_context: Option, + flow_name: String, + raw_sql: String, + expire_after: Option, + eval_interval_secs: Option, + comment: String, + options: HashMap, + created_time: DateTime, + updated_time: DateTime, + } + + let old_value = OldFlowInfoValue { + source_table_ids: vec![1, 2], + sink_table_name: TableName::new("greptime", "public", "sink"), + flownode_ids: BTreeMap::from([(0, 1)]), + catalog_name: "greptime".to_string(), + query_context: None, + flow_name: "legacy_flow".to_string(), + raw_sql: "select * from t".to_string(), + expire_after: Some(60), + eval_interval_secs: None, + comment: "legacy".to_string(), + options: HashMap::from([("flow_type".to_string(), "batching".to_string())]), + created_time: Utc::now(), + updated_time: Utc::now(), + }; + + let raw = serde_json::to_vec(&old_value).unwrap(); + let decoded = FlowInfoValue::try_from_raw_value(&raw).unwrap(); + + assert_eq!(decoded.source_table_ids, old_value.source_table_ids); + assert_eq!(decoded.sink_table_name, old_value.sink_table_name); + assert_eq!(decoded.flownode_ids, old_value.flownode_ids); + assert_eq!(decoded.catalog_name, old_value.catalog_name); + assert_eq!(decoded.flow_name, old_value.flow_name); + assert_eq!(decoded.raw_sql, old_value.raw_sql); + assert_eq!(decoded.options, old_value.options); + assert_eq!(decoded.all_source_table_names, Vec::::new()); + assert_eq!( + decoded.unresolved_source_table_names, + Vec::::new() + ); + assert_eq!(decoded.status, FlowStatus::Active); + assert_eq!(decoded.last_activation_error, None); + } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7b7983b1ba..abb323ba61 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -135,6 +135,20 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to init pending flow reconcile manager"))] + InitPendingFlowReconcileManager { + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to reconcile pending flows"))] + PendingFlowReconcile { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { #[snafu(implicit)] @@ -1271,6 +1285,8 @@ impl ErrorExt for Error { Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } | Error::InitReconciliationManager { source, .. } => source.status_code(), + Error::InitPendingFlowReconcileManager { source, .. } => source.status_code(), + Error::PendingFlowReconcile { source, .. } => source.status_code(), Error::BuildTlsOptions { source, .. } => source.status_code(), Error::Other { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/flow.rs b/src/meta-srv/src/flow.rs new file mode 100644 index 0000000000..ea8f9af01c --- /dev/null +++ b/src/meta-srv/src/flow.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_meta::ddl_manager::DdlManagerRef; +use common_telemetry::{error, info}; +use futures::TryStreamExt; +use snafu::ResultExt; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::define_ticker; +use crate::error::{PendingFlowReconcileSnafu, Result}; + +const PENDING_FLOW_RECONCILE_INTERVAL: Duration = Duration::from_secs(10); + +pub enum Event { + Tick, +} + +pub type PendingFlowReconcileTickerRef = Arc; + +define_ticker!( + PendingFlowReconcileTicker, + event_type = Event, + event_value = Event::Tick +); + +pub struct PendingFlowReconcileManager { + ddl_manager: DdlManagerRef, + receiver: Receiver, +} + +impl PendingFlowReconcileManager { + pub fn new(ddl_manager: DdlManagerRef) -> (Self, PendingFlowReconcileTicker) { + let (sender, receiver) = Self::channel(); + ( + Self { + ddl_manager, + receiver, + }, + PendingFlowReconcileTicker::new(PENDING_FLOW_RECONCILE_INTERVAL, sender), + ) + } + + fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(8) + } + + pub fn try_start(mut self) -> Result<()> { + common_runtime::spawn_global(async move { self.run().await }); + info!("Pending flow reconcile manager started"); + Ok(()) + } + + async fn run(&mut self) { + while let Some(event) = self.receiver.recv().await { + match event { + Event::Tick => { + if let Err(e) = self.handle_tick().await { + error!(e; "Failed to reconcile pending flows"); + } + } + } + } + } + + async fn handle_tick(&self) -> Result<()> { + let ddl_context = self.ddl_manager.create_context(); + let flow_infos = ddl_context + .flow_metadata_manager + .flow_info_manager() + .flow_infos() + .try_collect::>() + .await + .context(PendingFlowReconcileSnafu)?; + let pending_flows = flow_infos + .into_iter() + .filter_map(|(flow_id, flow_info)| { + flow_info + .is_pending() + .then_some((flow_id, flow_info.catalog_name().clone())) + }) + .collect::>(); + + for (flow_id, catalog_name) in pending_flows { + let current_flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get_raw(flow_id) + .await + .context(PendingFlowReconcileSnafu)?; + let Some(current_flow_info) = current_flow_info else { + continue; + }; + if !current_flow_info.get_inner_ref().is_pending() { + continue; + } + let _ = self + .ddl_manager + .submit_activate_pending_flow_task(flow_id, catalog_name) + .await + .context(PendingFlowReconcileSnafu)?; + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index c67bc32b40..5d4624d80d 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -25,6 +25,7 @@ pub mod election; pub mod error; pub mod events; mod failure_detector; +pub mod flow; pub mod gc; pub mod handler; pub mod key; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 165efd0555..1c5fd3fbfd 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -70,6 +70,7 @@ use crate::error::{ StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; +use crate::flow::PendingFlowReconcileTickerRef; use crate::gc::{GcSchedulerOptions, GcTickerRef}; use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; use crate::procedure::ProcedureManagerListenerAdapter; @@ -638,6 +639,7 @@ pub struct Metasrv { region_flush_ticker: Option, table_id_allocator: ResourceIdAllocatorRef, reconciliation_manager: ReconciliationManagerRef, + pending_flow_reconcile_ticker: Option, resource_stat: ResourceStatRef, gc_ticker: Option, @@ -703,6 +705,9 @@ impl Metasrv { if let Some(gc_ticker) = &self.gc_ticker { leadership_change_notifier.add_listener(gc_ticker.clone() as _); } + if let Some(pending_flow_reconcile_ticker) = &self.pending_flow_reconcile_ticker { + leadership_change_notifier.add_listener(pending_flow_reconcile_ticker.clone() as _); + } if let Some(customizer) = self.plugins.get::() { customizer.customize(&mut leadership_change_notifier); } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 58039d2de4..717da869bf 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -56,6 +56,7 @@ use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::MetaPeerClientRef; use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result}; use crate::events::EventHandlerImpl; +use crate::flow::{PendingFlowReconcileManager, PendingFlowReconcileTickerRef}; use crate::gc::GcScheduler; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; @@ -556,6 +557,15 @@ impl MetasrvBuilder { .try_start() .context(error::InitReconciliationManagerSnafu)?; + let (pending_flow_reconcile_manager, pending_flow_reconcile_ticker) = + PendingFlowReconcileManager::new(ddl_manager.clone()); + pending_flow_reconcile_manager + .try_start() + .map_err(common_error::ext::BoxedError::new) + .context(error::InitPendingFlowReconcileManagerSnafu)?; + let pending_flow_reconcile_ticker: Option = + Some(Arc::new(pending_flow_reconcile_ticker)); + let mut resource_stat = ResourceStatImpl::default(); resource_stat.start_collect_cpu_usage(); @@ -597,6 +607,7 @@ impl MetasrvBuilder { region_flush_ticker, table_id_allocator, reconciliation_manager, + pending_flow_reconcile_ticker, topic_stats_registry, resource_stat: Arc::new(resource_stat), gc_ticker, diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 0a24eb3713..37e2f9f93b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -664,6 +664,7 @@ impl StatementExecutor { expr: &CreateFlowExpr, query_ctx: QueryContextRef, ) -> Result { + let mut has_missing_source_table = false; // first check if source table's ttl is instant, if it is, force streaming mode for src_table_name in &expr.source_table_names { let table = self @@ -676,14 +677,12 @@ impl StatementExecutor { ) .await .map_err(BoxedError::new) - .context(ExternalSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &src_table_name.catalog_name, - &src_table_name.schema_name, - &src_table_name.table_name, - ), - })?; + .context(ExternalSnafu)?; + + let Some(table) = table else { + has_missing_source_table = true; + continue; + }; // instant source table can only be handled by streaming mode if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) { @@ -700,6 +699,10 @@ impl StatementExecutor { } } + if has_missing_source_table { + return Ok(FlowType::Batching); + } + let engine = &self.query_engine; let stmts = ParserContext::create_with_dialect( &expr.sql,