feat: pending flow

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-11 20:29:46 +08:00
parent c6f1ef8aec
commit e9fec341ec
18 changed files with 1111 additions and 43 deletions

View File

@@ -208,7 +208,7 @@ mod tests {
use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::FlowMetadataManager;
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
@@ -231,6 +231,8 @@ mod tests {
1024,
FlowInfoValue {
source_table_ids: vec![1024, 1025],
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
sink_table_name: TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
@@ -245,6 +247,8 @@ mod tests {
eval_interval_secs: None,
comment: "comment".to_string(),
options: Default::default(),
status: FlowStatus::Active,
last_activation_error: None,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
},

View File

@@ -28,6 +28,7 @@ use crate::node_manager::NodeManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::region_registry::LeaderRegionRegistryRef;
pub mod activate_flow;
pub mod allocator;
pub mod alter_database;
pub mod alter_logical_tables;

View File

@@ -0,0 +1,449 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use api::v1::ExpireAfter;
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::ddl::create_flow::FlowType;
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::error::{self, Result};
use crate::instruction::{CacheIdent, CreateFlow};
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowLock};
use crate::metrics;
use crate::peer::Peer;
pub struct ActivatePendingFlowProcedure {
pub context: DdlContext,
pub data: ActivatePendingFlowData,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ActivatePendingFlowState {
Prepare,
CreateFlows,
UpdateMetadata,
InvalidateFlowCache,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ActivatePendingFlowData {
pub(crate) state: ActivatePendingFlowState,
pub(crate) flow_id: FlowId,
pub(crate) catalog_name: String,
#[serde(default)]
pub(crate) peers: Vec<Peer>,
#[serde(default)]
pub(crate) resolved_table_ids: Vec<TableId>,
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
}
pub struct PendingFlowResolution {
resolved_table_ids: Vec<TableId>,
unresolved_source_table_names: Vec<TableName>,
}
impl ActivatePendingFlowProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::ActivatePendingFlow";
pub fn new(flow_id: FlowId, catalog_name: String, context: DdlContext) -> Self {
Self {
context,
data: ActivatePendingFlowData {
state: ActivatePendingFlowState::Prepare,
flow_id,
catalog_name,
peers: vec![],
resolved_table_ids: vec![],
prev_flow_info_value: None,
},
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
async fn on_prepare(&mut self) -> Result<Status> {
let Some(current_flow_info) = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(self.data.flow_id)
.await?
else {
return Ok(Status::done());
};
if current_flow_info.get_inner_ref().is_active() {
return Ok(Status::done());
}
let resolution =
resolve_pending_flow_sources(&self.context, current_flow_info.get_inner_ref()).await?;
if !resolution.unresolved_source_table_names.is_empty() {
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
resolution.unresolved_source_table_names,
None,
)
.await?;
return Ok(Status::done());
}
if let Some(reason) =
validate_batching_activation(&self.context, &resolution.resolved_table_ids).await?
{
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
vec![],
Some(reason),
)
.await?;
return Ok(Status::done());
}
self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?;
self.data.resolved_table_ids = resolution.resolved_table_ids;
self.data.prev_flow_info_value = Some(current_flow_info);
self.data.state = ActivatePendingFlowState::CreateFlows;
Ok(Status::executing(true))
}
async fn on_create_flows(&mut self) -> Result<Status> {
let flow_info =
self.data
.prev_flow_info_value
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "missing previous flow info for activation",
})?;
let request = build_create_request(
self.data.flow_id,
flow_info.get_inner_ref(),
&self.data.resolved_table_ids,
);
create_flow_on_peers(
&self.context,
&self.data.peers,
request,
flow_info.get_inner_ref(),
)
.await?;
self.data.state = ActivatePendingFlowState::UpdateMetadata;
Ok(Status::executing(true))
}
async fn on_update_metadata(&mut self) -> Result<Status> {
let current_flow_info =
self.data
.prev_flow_info_value
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "missing previous flow info for metadata update",
})?;
let (new_flow_info, flow_routes) = build_active_flow_info(
current_flow_info.get_inner_ref(),
&self.data.peers,
&self.data.resolved_table_ids,
);
self.context
.flow_metadata_manager
.update_flow_metadata(
self.data.flow_id,
current_flow_info,
&new_flow_info,
flow_routes,
)
.await?;
self.data.state = ActivatePendingFlowState::InvalidateFlowCache;
Ok(Status::executing(true))
}
async fn on_broadcast(&mut self) -> Result<Status> {
invalidate_flow_cache(
&self.context,
self.data.flow_id,
&self.data.resolved_table_ids,
&self.data.peers,
)
.await?;
Ok(Status::done_with_output(self.data.flow_id))
}
}
#[async_trait]
impl Procedure for ActivatePendingFlowProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
.with_label_values(&[match state {
ActivatePendingFlowState::Prepare => "activate_prepare",
ActivatePendingFlowState::CreateFlows => "activate_create_flows",
ActivatePendingFlowState::UpdateMetadata => "activate_update_metadata",
ActivatePendingFlowState::InvalidateFlowCache => "activate_invalidate_flow_cache",
}])
.start_timer();
match self.data.state {
ActivatePendingFlowState::Prepare => self.on_prepare().await,
ActivatePendingFlowState::CreateFlows => self.on_create_flows().await,
ActivatePendingFlowState::UpdateMetadata => self.on_update_metadata().await,
ActivatePendingFlowState::InvalidateFlowCache => self.on_broadcast().await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
LockKey::new(vec![
CatalogLock::Read(&self.data.catalog_name).into(),
FlowLock::Write(self.data.flow_id).into(),
])
}
}
async fn resolve_pending_flow_sources(
context: &DdlContext,
flow_info: &FlowInfoValue,
) -> Result<PendingFlowResolution> {
let keys = flow_info
.all_source_table_names()
.iter()
.map(|name| TableNameKey::new(&name.catalog_name, &name.schema_name, &name.table_name))
.collect::<Vec<_>>();
let resolved_tables = context
.table_metadata_manager
.table_name_manager()
.batch_get(keys)
.await?;
let mut resolved_table_ids = Vec::with_capacity(flow_info.all_source_table_names().len());
let mut unresolved_source_table_names = Vec::new();
for (name, table_id) in flow_info
.all_source_table_names()
.iter()
.zip(resolved_tables.into_iter())
{
match table_id {
Some(table_id) => resolved_table_ids.push(table_id.table_id()),
None => unresolved_source_table_names.push(name.clone()),
}
}
Ok(PendingFlowResolution {
resolved_table_ids,
unresolved_source_table_names,
})
}
async fn update_pending_flow_metadata(
context: &DdlContext,
flow_id: FlowId,
current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
resolved_table_ids: Vec<TableId>,
unresolved_source_table_names: Vec<TableName>,
last_activation_error: Option<String>,
) -> Result<()> {
let mut new_flow_info = current_flow_info.get_inner_ref().clone();
new_flow_info.source_table_ids = resolved_table_ids;
new_flow_info.unresolved_source_table_names = unresolved_source_table_names;
new_flow_info.last_activation_error = last_activation_error;
new_flow_info.updated_time = chrono::Utc::now();
context
.flow_metadata_manager
.update_flow_metadata(flow_id, current_flow_info, &new_flow_info, vec![])
.await
}
async fn validate_batching_activation(
context: &DdlContext,
resolved_table_ids: &[TableId],
) -> Result<Option<String>> {
let table_infos = context
.table_metadata_manager
.table_info_manager()
.batch_get(resolved_table_ids)
.await?;
for source_table_id in resolved_table_ids {
let Some(table_info) = table_infos.get(source_table_id) else {
return Ok(Some(format!(
"Source table metadata is not ready yet, table_id={source_table_id}",
)));
};
if table_info.table_info.meta.options.ttl == Some(common_time::TimeToLive::Instant) {
return Ok(Some(format!(
"Source table {} requires streaming activation because ttl=instant",
table_info.table_name()
)));
}
}
Ok(None)
}
fn build_create_request(
flow_id: FlowId,
flow_info: &FlowInfoValue,
resolved_table_ids: &[TableId],
) -> CreateRequest {
let mut flow_options = flow_info.options.clone();
flow_options.insert(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::Batching.to_string(),
);
CreateRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
source_table_ids: resolved_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(flow_info.sink_table_name.clone().into()),
create_if_not_exists: true,
or_replace: false,
expire_after: flow_info.expire_after.map(|value| ExpireAfter { value }),
eval_interval: flow_info
.eval_interval_secs
.map(|seconds| api::v1::EvalInterval { seconds }),
comment: flow_info.comment.clone(),
sql: flow_info.raw_sql.clone(),
flow_options,
}
}
async fn create_flow_on_peers(
context: &DdlContext,
peers: &[Peer],
request: CreateRequest,
flow_info: &FlowInfoValue,
) -> Result<()> {
let query_context = flow_info.query_context.clone().unwrap_or_default();
let mut create_flow_tasks = Vec::with_capacity(peers.len());
for peer in peers {
let requester = context.node_manager.flownode(peer).await;
let request = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(query_context.clone().into()),
}),
body: Some(PbFlowRequest::Create(request.clone())),
};
create_flow_tasks.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer.clone()))
});
}
join_all(create_flow_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
fn build_active_flow_info(
current_flow_info: &FlowInfoValue,
peers: &[Peer],
resolved_table_ids: &[TableId],
) -> (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
let flownode_ids = peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();
let flow_routes = peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, FlowRouteValue { peer: peer.clone() }))
.collect::<Vec<_>>();
let mut new_flow_info = current_flow_info.clone();
new_flow_info.source_table_ids = resolved_table_ids.to_vec();
new_flow_info.unresolved_source_table_names = vec![];
new_flow_info.flownode_ids = flownode_ids;
new_flow_info.status = FlowStatus::Active;
new_flow_info.last_activation_error = None;
new_flow_info.options.insert(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::Batching.to_string(),
);
new_flow_info.updated_time = chrono::Utc::now();
(new_flow_info, flow_routes)
}
async fn invalidate_flow_cache(
context: &DdlContext,
flow_id: FlowId,
resolved_table_ids: &[TableId],
peers: &[Peer],
) -> Result<()> {
let ctx = Context {
subject: Some("Invalidate flow cache by activating pending flow".to_string()),
};
let flow_part2peers = peers
.iter()
.enumerate()
.map(|(idx, peer)| (idx as u32, peer.clone()))
.collect();
context
.cache_invalidator
.invalidate(
&ctx,
&[
CacheIdent::CreateFlow(CreateFlow {
flow_id,
source_table_ids: resolved_table_ids.to_vec(),
partition_to_peer_mapping: flow_part2peers,
}),
CacheIdent::FlowId(flow_id),
],
)
.await
}

View File

@@ -34,13 +34,14 @@ use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use strum::AsRefStr;
use table::metadata::TableId;
use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
@@ -67,11 +68,13 @@ impl CreateFlowProcedure {
flow_id: None,
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context: query_context.into(), // Convert to FlowQueryContext
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
last_activation_error: None,
},
}
}
@@ -189,15 +192,32 @@ impl CreateFlowProcedure {
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
if self.data.is_pending() {
self.data.peers.clear();
self.data.state = CreateFlowState::CreateMetadata;
} else {
if self.data.peers.is_empty() {
self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?;
}
self.data.state = CreateFlowState::CreateFlows;
}
Ok(Status::executing(true))
}
async fn on_flownode_create_flows(&mut self) -> Result<Status> {
// Safety: must be allocated.
ensure!(
!self.data.peers.is_empty(),
UnexpectedSnafu {
err_msg: format!(
"Flow {:?} enters CreateFlows with no peers allocated",
self.data.flow_id
),
}
);
let mut create_flow = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
@@ -281,20 +301,20 @@ impl CreateFlowProcedure {
})]);
}
let (_flow_info, flow_routes) = (&self.data).into();
let flow_part2peers = flow_routes
.into_iter()
.map(|(part_id, route)| (part_id, route.peer))
.collect();
if self.data.is_active() {
let (_flow_info, flow_routes) = (&self.data).into();
let flow_part2peers = flow_routes
.into_iter()
.map(|(part_id, route)| (part_id, route.peer))
.collect();
caches.extend([
CacheIdent::CreateFlow(CreateFlow {
caches.push(CacheIdent::CreateFlow(CreateFlow {
flow_id,
source_table_ids: self.data.source_table_ids.clone(),
partition_to_peer_mapping: flow_part2peers,
}),
CacheIdent::FlowId(flow_id),
]);
}));
}
caches.push(CacheIdent::FlowId(flow_id));
self.context
.cache_invalidator
@@ -411,6 +431,8 @@ pub struct CreateFlowData {
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
#[serde(default)]
pub(crate) unresolved_source_table_names: Vec<TableName>,
/// Use alias for backward compatibility with QueryContext serialized data
#[serde(alias = "query_context")]
pub(crate) flow_context: FlowQueryContext,
@@ -422,6 +444,18 @@ pub struct CreateFlowData {
#[serde(default)]
pub(crate) did_replace: bool,
pub(crate) flow_type: Option<FlowType>,
#[serde(default)]
pub(crate) last_activation_error: Option<String>,
}
impl CreateFlowData {
pub(crate) fn is_pending(&self) -> bool {
!self.unresolved_source_table_names.is_empty()
}
pub(crate) fn is_active(&self) -> bool {
!self.is_pending()
}
}
impl From<&CreateFlowData> for CreateRequest {
@@ -495,6 +529,8 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
let flow_info: FlowInfoValue = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
all_source_table_names: value.task.source_table_names.clone(),
unresolved_source_table_names: value.unresolved_source_table_names.clone(),
sink_table_name,
flownode_ids,
catalog_name,
@@ -506,6 +542,12 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
eval_interval_secs: eval_interval,
comment,
options,
status: if value.is_pending() {
FlowStatus::PendingSources
} else {
FlowStatus::Active
},
last_activation_error: value.last_activation_error.clone(),
created_time: create_time,
updated_time: chrono::Utc::now(),
};

View File

@@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::OptionExt;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::error::{self, Result};
use crate::error::Result;
use crate::key::table_name::TableNameKey;
impl CreateFlowProcedure {
@@ -36,7 +34,6 @@ impl CreateFlowProcedure {
/// Ensures all source tables exist and collects source table ids
pub(crate) async fn collect_source_tables(&mut self) -> Result<()> {
// Ensures all source tables exist.
let keys = self
.data
.task
@@ -52,22 +49,25 @@ impl CreateFlowProcedure {
.batch_get(keys)
.await?;
let source_table_ids = self
let mut resolved = Vec::with_capacity(self.data.task.source_table_names.len());
let mut unresolved = Vec::new();
for (name, table_id) in self
.data
.task
.source_table_names
.iter()
.zip(source_table_ids)
.map(|(name, table_id)| {
Ok(table_id
.with_context(|| error::TableNotFoundSnafu {
table_name: name.to_string(),
})?
.table_id())
})
.collect::<Result<Vec<_>>>()?;
{
match table_id {
Some(table_id) => resolved.push(table_id.table_id()),
None => unresolved.push(name.clone()),
}
}
self.data.source_table_ids = source_table_ids;
self.data.source_table_ids = resolved;
self.data.unresolved_source_table_names = unresolved;
self.data.last_activation_error = None;
Ok(())
}
}

View File

@@ -59,8 +59,13 @@ impl FlowMetadataAllocator {
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.peer_allocator.alloc(partitions).await?;
let peers = self.alloc_peers(partitions).await?;
Ok((flow_id, peers))
}
pub async fn alloc_peers(&self, partitions: usize) -> Result<Vec<Peer>> {
let peers = self.peer_allocator.alloc(partitions).await?;
Ok(peers)
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod activate_flow;
mod alter_logical_tables;
mod alter_table;
mod create_flow;

View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure_test::execute_procedure_until_done;
use common_time::TimeToLive;
use table::table_name::TableName;
use crate::ddl::activate_flow::ActivatePendingFlowProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::tests::create_flow::create_test_flow;
use crate::key::table_route::TableRouteValue;
use crate::test_util::{MockFlownodeManager, new_ddl_context};
#[tokio::test]
async fn test_activate_pending_flow() {
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_source_table",
)];
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_sink_table",
);
let node_manager = Arc::new(MockFlownodeManager::new(
crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler,
));
let ddl_context = new_ddl_context(node_manager);
let flow_id = create_test_flow(
&ddl_context,
"activate_pending_flow",
source_table_names.clone(),
sink_table_name,
)
.await;
let pending_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(pending_flow.is_pending());
let create_table_task = test_create_table_task("activate_source_table", 1024);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
Default::default(),
)
.await
.unwrap();
let mut procedure = ActivatePendingFlowProcedure::new(
flow_id,
DEFAULT_CATALOG_NAME.to_string(),
ddl_context.clone(),
);
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let activated_flow_id = output.downcast_ref::<u32>().unwrap();
assert_eq!(*activated_flow_id, flow_id);
let activated_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(activated_flow.is_active());
assert_eq!(activated_flow.unresolved_source_table_names().len(), 0);
assert_eq!(activated_flow.source_table_ids(), &[1024]);
assert_eq!(activated_flow.last_activation_error(), &None);
assert!(!activated_flow.flownode_ids().is_empty());
}
#[tokio::test]
async fn test_activate_pending_flow_require_streaming_keeps_pending() {
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"instant_ttl_source_table",
)];
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"instant_ttl_sink_table",
);
let node_manager = Arc::new(MockFlownodeManager::new(
crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler,
));
let ddl_context = new_ddl_context(node_manager);
let flow_id = create_test_flow(
&ddl_context,
"instant_ttl_pending_flow",
source_table_names.clone(),
sink_table_name,
)
.await;
let mut create_table_task = test_create_table_task("instant_ttl_source_table", 1025);
create_table_task.table_info.meta.options.ttl = Some(TimeToLive::Instant);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
Default::default(),
)
.await
.unwrap();
let mut procedure = ActivatePendingFlowProcedure::new(
flow_id,
DEFAULT_CATALOG_NAME.to_string(),
ddl_context.clone(),
);
assert!(execute_procedure_until_done(&mut procedure).await.is_none());
let pending_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(pending_flow.is_pending());
assert_eq!(pending_flow.unresolved_source_table_names().len(), 0);
assert_eq!(pending_flow.source_table_ids(), &[1025]);
assert!(pending_flow.flownode_ids().is_empty());
assert!(
pending_flow
.last_activation_error()
.as_ref()
.unwrap()
.contains("requires streaming activation")
);
}

View File

@@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::Status;
use common_procedure_test::execute_procedure_until_done;
use table::table_name::TableName;
@@ -74,9 +75,23 @@ async fn test_create_flow_source_table_not_found() {
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let query_ctx = test_query_context();
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::TableNotFound { .. });
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context.clone());
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true, .. });
assert_eq!(procedure.data.unresolved_source_table_names.len(), 1);
assert_eq!(procedure.data.source_table_ids, Vec::<u32>::new());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = *output.downcast_ref::<FlowId>().unwrap();
let flow_info = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(flow_info.is_pending());
assert_eq!(flow_info.unresolved_source_table_names().len(), 1);
}
pub(crate) async fn create_test_flow(
@@ -152,6 +167,83 @@ async fn test_create_flow() {
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
}
#[tokio::test]
async fn test_replace_pending_flow_activates_with_allocated_peers() {
let source_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"replace_pending_source_table",
);
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"replace_pending_sink_table",
);
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let pending_flow_id = create_test_flow(
&ddl_context,
"replace_pending_flow",
vec![source_table_name.clone()],
sink_table_name.clone(),
)
.await;
let pending_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(pending_flow_id)
.await
.unwrap()
.unwrap();
assert!(pending_flow.is_pending());
assert!(pending_flow.flownode_ids().is_empty());
let create_table_task = test_create_table_task("replace_pending_source_table", 1026);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let mut replace_task = test_create_flow_task(
"replace_pending_flow",
vec![source_table_name],
sink_table_name,
false,
);
replace_task.or_replace = true;
let query_ctx = test_query_context();
let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = *output.downcast_ref::<FlowId>().unwrap();
assert_eq!(flow_id, pending_flow_id);
let replaced_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(replaced_flow.is_active());
assert_eq!(replaced_flow.source_table_ids(), &[1026]);
assert!(!replaced_flow.flownode_ids().is_empty());
let routes = ddl_context
.flow_metadata_manager
.flow_route_manager()
.routes(flow_id)
.await
.unwrap();
assert!(!routes.is_empty());
}
#[tokio::test]
async fn test_create_flow_same_source_and_sink_table() {
let table_id = 1024;
@@ -259,10 +351,12 @@ fn test_create_flow_data_new_format_serialization() {
flow_id: None,
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
last_activation_error: None,
};
let serialized = serde_json::to_string(&data).unwrap();
@@ -309,10 +403,12 @@ fn test_flow_info_conversion_with_flow_context() {
flow_id: Some(123),
peers: vec![],
source_table_ids: vec![456, 789],
unresolved_source_table_names: vec![],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: Some(FlowType::Batching),
last_activation_error: None,
};
let (flow_info, _routes) = (&data).into();

View File

@@ -29,6 +29,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use table::table_name::TableName;
use crate::ddl::activate_flow::ActivatePendingFlowProcedure;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
@@ -52,7 +53,7 @@ use crate::error::{
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::key::{DeserializedValueWithBytes, FlowId, TableMetadataManagerRef};
use crate::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
@@ -232,6 +233,7 @@ impl DdlManager {
CreateTableProcedure,
CreateLogicalTablesProcedure,
CreateViewProcedure,
ActivatePendingFlowProcedure,
CreateFlowProcedure,
AlterTableProcedure,
AlterLogicalTablesProcedure,
@@ -489,6 +491,17 @@ impl DdlManager {
self.execute_procedure_and_wait(procedure_with_id).await
}
pub async fn submit_activate_pending_flow_task(
&self,
flow_id: FlowId,
catalog_name: String,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = ActivatePendingFlowProcedure::new(flow_id, catalog_name, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.execute_procedure_and_wait(procedure_with_id).await
}
/// Submits and executes a drop flow task.
#[tracing::instrument(skip_all)]
pub async fn submit_drop_flow_task(

View File

@@ -460,6 +460,8 @@ mod tests {
query_context: None,
flow_name: flow_name.to_string(),
source_table_ids,
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
sink_table_name,
flownode_ids,
raw_sql: "raw".to_string(),
@@ -467,6 +469,8 @@ mod tests {
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
status: flow_info::FlowStatus::Active,
last_activation_error: None,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
}
@@ -635,6 +639,8 @@ mod tests {
query_context: None,
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
@@ -642,6 +648,8 @@ mod tests {
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
status: flow_info::FlowStatus::Active,
last_activation_error: None,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
};
@@ -1012,6 +1020,8 @@ mod tests {
query_context: None,
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
@@ -1019,6 +1029,8 @@ mod tests {
eval_interval_secs: None,
comment: "hi".to_string(),
options: Default::default(),
status: flow_info::FlowStatus::Active,
last_activation_error: None,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
};

View File

@@ -16,6 +16,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -27,12 +29,24 @@ use crate::FlownodeId;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue,
};
use crate::kv_backend::KvBackendRef;
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
use crate::rpc::KeyValue;
use crate::rpc::store::RangeRequest;
const FLOW_INFO_KEY_PREFIX: &str = "info";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum FlowStatus {
PendingSources,
#[default]
Active,
}
lazy_static! {
static ref FLOW_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
@@ -114,10 +128,16 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FlowInfoValue {
/// The source tables used by the flow.
#[serde(default)]
pub source_table_ids: Vec<TableId>,
#[serde(default)]
pub all_source_table_names: Vec<TableName>,
#[serde(default)]
pub unresolved_source_table_names: Vec<TableName>,
/// The sink table used by the flow.
pub sink_table_name: TableName,
/// Which flow nodes this flow is running on.
#[serde(default)]
pub flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub catalog_name: String,
@@ -145,6 +165,10 @@ pub struct FlowInfoValue {
pub comment: String,
/// The options.
pub options: HashMap<String, String>,
#[serde(default)]
pub status: FlowStatus,
#[serde(default)]
pub last_activation_error: Option<String>,
/// The created time
#[serde(default)]
pub created_time: DateTime<Utc>,
@@ -154,6 +178,14 @@ pub struct FlowInfoValue {
}
impl FlowInfoValue {
pub fn is_pending(&self) -> bool {
self.status == FlowStatus::PendingSources
}
pub fn is_active(&self) -> bool {
self.status == FlowStatus::Active
}
/// Returns the `flownode_id`.
pub fn flownode_ids(&self) -> &BTreeMap<FlowPartitionId, FlownodeId> {
&self.flownode_ids
@@ -173,6 +205,14 @@ impl FlowInfoValue {
&self.source_table_ids
}
pub fn all_source_table_names(&self) -> &[TableName] {
&self.all_source_table_names
}
pub fn unresolved_source_table_names(&self) -> &[TableName] {
&self.unresolved_source_table_names
}
pub fn catalog_name(&self) -> &String {
&self.catalog_name
}
@@ -209,6 +249,14 @@ impl FlowInfoValue {
&self.options
}
pub fn status(&self) -> &FlowStatus {
&self.status
}
pub fn last_activation_error(&self) -> &Option<String> {
&self.last_activation_error
}
pub fn created_time(&self) -> &DateTime<Utc> {
&self.created_time
}
@@ -225,6 +273,12 @@ pub struct FlowInfoManager {
kv_backend: KvBackendRef,
}
pub fn flow_info_decoder(kv: KeyValue) -> Result<(FlowInfoKey, FlowInfoValue)> {
let key = FlowInfoKey::from_bytes(&kv.key)?;
let value = FlowInfoValue::try_from_raw_value(&kv.value)?;
Ok((key, value))
}
impl FlowInfoManager {
/// Returns a new [FlowInfoManager].
pub fn new(kv_backend: KvBackendRef) -> Self {
@@ -254,6 +308,23 @@ impl FlowInfoManager {
.transpose()
}
pub fn flow_infos(&self) -> BoxStream<'static, Result<(FlowId, FlowInfoValue)>> {
let start_key = FlowScoped::new(BytesAdapter::from(
format!("{FLOW_INFO_KEY_PREFIX}/").into_bytes(),
))
.to_bytes();
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
flow_info_decoder,
)
.into_stream();
Box::pin(stream.map_ok(|(key, value)| (key.flow_id(), value)))
}
/// Builds a create flow transaction.
/// It is expected that the `__flow/info/{flow_id}` wasn't occupied.
/// Otherwise, the transaction will retrieve existing value.
@@ -308,6 +379,12 @@ impl FlowInfoManager {
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashMap};
use chrono::Utc;
use serde::Serialize;
use table::table_name::TableName;
use super::*;
#[test]
@@ -322,4 +399,58 @@ mod tests {
let key = FlowInfoKey::from_bytes(&bytes).unwrap();
assert_eq!(key.flow_id(), 2);
}
#[test]
fn test_flow_info_value_backward_compatibility() {
#[derive(Serialize)]
struct OldFlowInfoValue {
source_table_ids: Vec<TableId>,
sink_table_name: TableName,
flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
catalog_name: String,
query_context: Option<crate::rpc::ddl::QueryContext>,
flow_name: String,
raw_sql: String,
expire_after: Option<i64>,
eval_interval_secs: Option<i64>,
comment: String,
options: HashMap<String, String>,
created_time: DateTime<Utc>,
updated_time: DateTime<Utc>,
}
let old_value = OldFlowInfoValue {
source_table_ids: vec![1, 2],
sink_table_name: TableName::new("greptime", "public", "sink"),
flownode_ids: BTreeMap::from([(0, 1)]),
catalog_name: "greptime".to_string(),
query_context: None,
flow_name: "legacy_flow".to_string(),
raw_sql: "select * from t".to_string(),
expire_after: Some(60),
eval_interval_secs: None,
comment: "legacy".to_string(),
options: HashMap::from([("flow_type".to_string(), "batching".to_string())]),
created_time: Utc::now(),
updated_time: Utc::now(),
};
let raw = serde_json::to_vec(&old_value).unwrap();
let decoded = FlowInfoValue::try_from_raw_value(&raw).unwrap();
assert_eq!(decoded.source_table_ids, old_value.source_table_ids);
assert_eq!(decoded.sink_table_name, old_value.sink_table_name);
assert_eq!(decoded.flownode_ids, old_value.flownode_ids);
assert_eq!(decoded.catalog_name, old_value.catalog_name);
assert_eq!(decoded.flow_name, old_value.flow_name);
assert_eq!(decoded.raw_sql, old_value.raw_sql);
assert_eq!(decoded.options, old_value.options);
assert_eq!(decoded.all_source_table_names, Vec::<TableName>::new());
assert_eq!(
decoded.unresolved_source_table_names,
Vec::<TableName>::new()
);
assert_eq!(decoded.status, FlowStatus::Active);
assert_eq!(decoded.last_activation_error, None);
}
}

View File

@@ -135,6 +135,20 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to init pending flow reconcile manager"))]
InitPendingFlowReconcileManager {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to reconcile pending flows"))]
PendingFlowReconcile {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
@@ -1271,6 +1285,8 @@ impl ErrorExt for Error {
Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. }
| Error::InitReconciliationManager { source, .. } => source.status_code(),
Error::InitPendingFlowReconcileManager { source, .. } => source.status_code(),
Error::PendingFlowReconcile { source, .. } => source.status_code(),
Error::BuildTlsOptions { source, .. } => source.status_code(),
Error::Other { source, .. } => source.status_code(),

120
src/meta-srv/src/flow.rs Normal file
View File

@@ -0,0 +1,120 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use common_meta::ddl_manager::DdlManagerRef;
use common_telemetry::{error, info};
use futures::TryStreamExt;
use snafu::ResultExt;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::define_ticker;
use crate::error::{PendingFlowReconcileSnafu, Result};
const PENDING_FLOW_RECONCILE_INTERVAL: Duration = Duration::from_secs(10);
pub enum Event {
Tick,
}
pub type PendingFlowReconcileTickerRef = Arc<PendingFlowReconcileTicker>;
define_ticker!(
PendingFlowReconcileTicker,
event_type = Event,
event_value = Event::Tick
);
pub struct PendingFlowReconcileManager {
ddl_manager: DdlManagerRef,
receiver: Receiver<Event>,
}
impl PendingFlowReconcileManager {
pub fn new(ddl_manager: DdlManagerRef) -> (Self, PendingFlowReconcileTicker) {
let (sender, receiver) = Self::channel();
(
Self {
ddl_manager,
receiver,
},
PendingFlowReconcileTicker::new(PENDING_FLOW_RECONCILE_INTERVAL, sender),
)
}
fn channel() -> (Sender<Event>, Receiver<Event>) {
tokio::sync::mpsc::channel(8)
}
pub fn try_start(mut self) -> Result<()> {
common_runtime::spawn_global(async move { self.run().await });
info!("Pending flow reconcile manager started");
Ok(())
}
async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Tick => {
if let Err(e) = self.handle_tick().await {
error!(e; "Failed to reconcile pending flows");
}
}
}
}
}
async fn handle_tick(&self) -> Result<()> {
let ddl_context = self.ddl_manager.create_context();
let flow_infos = ddl_context
.flow_metadata_manager
.flow_info_manager()
.flow_infos()
.try_collect::<Vec<_>>()
.await
.context(PendingFlowReconcileSnafu)?;
let pending_flows = flow_infos
.into_iter()
.filter_map(|(flow_id, flow_info)| {
flow_info
.is_pending()
.then_some((flow_id, flow_info.catalog_name().clone()))
})
.collect::<Vec<_>>();
for (flow_id, catalog_name) in pending_flows {
let current_flow_info = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await
.context(PendingFlowReconcileSnafu)?;
let Some(current_flow_info) = current_flow_info else {
continue;
};
if !current_flow_info.get_inner_ref().is_pending() {
continue;
}
let _ = self
.ddl_manager
.submit_activate_pending_flow_task(flow_id, catalog_name)
.await
.context(PendingFlowReconcileSnafu)?;
}
Ok(())
}
}

View File

@@ -25,6 +25,7 @@ pub mod election;
pub mod error;
pub mod events;
mod failure_detector;
pub mod flow;
pub mod gc;
pub mod handler;
pub mod key;

View File

@@ -70,6 +70,7 @@ use crate::error::{
StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::flow::PendingFlowReconcileTickerRef;
use crate::gc::{GcSchedulerOptions, GcTickerRef};
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
use crate::procedure::ProcedureManagerListenerAdapter;
@@ -638,6 +639,7 @@ pub struct Metasrv {
region_flush_ticker: Option<RegionFlushTickerRef>,
table_id_allocator: ResourceIdAllocatorRef,
reconciliation_manager: ReconciliationManagerRef,
pending_flow_reconcile_ticker: Option<PendingFlowReconcileTickerRef>,
resource_stat: ResourceStatRef,
gc_ticker: Option<GcTickerRef>,
@@ -703,6 +705,9 @@ impl Metasrv {
if let Some(gc_ticker) = &self.gc_ticker {
leadership_change_notifier.add_listener(gc_ticker.clone() as _);
}
if let Some(pending_flow_reconcile_ticker) = &self.pending_flow_reconcile_ticker {
leadership_change_notifier.add_listener(pending_flow_reconcile_ticker.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

View File

@@ -56,6 +56,7 @@ use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::flow::{PendingFlowReconcileManager, PendingFlowReconcileTickerRef};
use crate::gc::GcScheduler;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::failure_handler::RegionFailureHandler;
@@ -556,6 +557,15 @@ impl MetasrvBuilder {
.try_start()
.context(error::InitReconciliationManagerSnafu)?;
let (pending_flow_reconcile_manager, pending_flow_reconcile_ticker) =
PendingFlowReconcileManager::new(ddl_manager.clone());
pending_flow_reconcile_manager
.try_start()
.map_err(common_error::ext::BoxedError::new)
.context(error::InitPendingFlowReconcileManagerSnafu)?;
let pending_flow_reconcile_ticker: Option<PendingFlowReconcileTickerRef> =
Some(Arc::new(pending_flow_reconcile_ticker));
let mut resource_stat = ResourceStatImpl::default();
resource_stat.start_collect_cpu_usage();
@@ -597,6 +607,7 @@ impl MetasrvBuilder {
region_flush_ticker,
table_id_allocator,
reconciliation_manager,
pending_flow_reconcile_ticker,
topic_stats_registry,
resource_stat: Arc::new(resource_stat),
gc_ticker,

View File

@@ -664,6 +664,7 @@ impl StatementExecutor {
expr: &CreateFlowExpr,
query_ctx: QueryContextRef,
) -> Result<FlowType> {
let mut has_missing_source_table = false;
// first check if source table's ttl is instant, if it is, force streaming mode
for src_table_name in &expr.source_table_names {
let table = self
@@ -676,14 +677,12 @@ impl StatementExecutor {
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&src_table_name.catalog_name,
&src_table_name.schema_name,
&src_table_name.table_name,
),
})?;
.context(ExternalSnafu)?;
let Some(table) = table else {
has_missing_source_table = true;
continue;
};
// instant source table can only be handled by streaming mode
if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) {
@@ -700,6 +699,10 @@ impl StatementExecutor {
}
}
if has_missing_source_table {
return Ok(FlowType::Batching);
}
let engine = &self.query_engine;
let stmts = ParserContext::create_with_dialect(
&expr.sql,