chore: per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-24 11:27:45 +08:00
parent 431fbc3371
commit 1b2480611b
9 changed files with 249 additions and 70 deletions

1
Cargo.lock generated
View File

@@ -12578,6 +12578,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-stream",
"toml 0.8.23",
]
[[package]]

View File

@@ -44,6 +44,11 @@ use crate::lock_key::{CatalogLock, FlowLock, FlowNameLock};
use crate::metrics;
use crate::peer::Peer;
/// Activates a pending flow after all source tables become available.
///
/// The procedure resolves missing source tables, validates the final activation
/// mode, creates the runtime flow on flownodes, updates metadata to active, and
/// invalidates related caches.
pub struct ActivatePendingFlowProcedure {
pub context: DdlContext,
pub data: ActivatePendingFlowData,
@@ -104,6 +109,10 @@ impl ActivatePendingFlowProcedure {
.get_raw(self.data.flow_id)
.await?
else {
common_telemetry::debug!(
"Pending flow {} no longer exists during activation",
self.data.flow_id
);
return Ok(Status::done());
};
@@ -123,6 +132,12 @@ impl ActivatePendingFlowProcedure {
let resolution =
resolve_pending_flow_sources(&self.context, current_flow_info.get_inner_ref()).await?;
common_telemetry::debug!(
"Resolved pending flow {} source tables: {} resolved, {} unresolved",
self.data.flow_id,
resolution.resolved_table_ids.len(),
resolution.unresolved_source_table_names.len()
);
if !resolution.unresolved_source_table_names.is_empty() {
update_pending_flow_metadata(
&self.context,
@@ -136,19 +151,21 @@ impl ActivatePendingFlowProcedure {
return Ok(Status::done());
}
if let Some(reason) =
validate_batching_activation(&self.context, &resolution.resolved_table_ids).await?
{
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
vec![],
Some(reason),
)
.await?;
return Ok(Status::done());
if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching {
if let Some(reason) =
validate_batching_activation(&self.context, &resolution.resolved_table_ids).await?
{
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
vec![],
Some(reason),
)
.await?;
return Ok(Status::done());
}
}
self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?;
@@ -172,6 +189,11 @@ impl ActivatePendingFlowProcedure {
flow_info.get_inner_ref(),
&self.data.resolved_table_ids,
);
debug_assert_eq!(
self.data.resolved_table_ids.len(),
flow_info.get_inner_ref().all_source_table_names().len(),
"All source tables must be resolved before pending flow activation"
);
create_flow_on_peers(
&self.context,
&self.data.peers,
@@ -258,12 +280,12 @@ impl Procedure for ActivatePendingFlowProcedure {
// we should lock the flow name to avoid concurrent modification with create/replace flow procedure
LockKey::new(vec![
CatalogLock::Read(&self.data.catalog_name).into(),
FlowLock::Write(self.data.flow_id).into(),
FlowNameLock::new(
flow_info.get_inner_ref().catalog_name(),
flow_info.get_inner_ref().flow_name(),
)
.into(),
FlowLock::Write(self.data.flow_id).into(),
])
} else {
LockKey::new(vec![
@@ -368,7 +390,7 @@ fn build_create_request(
let mut flow_options = flow_info.options.clone();
flow_options.insert(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::Batching.to_string(),
get_flow_type(flow_info).to_string(),
);
CreateRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
@@ -425,6 +447,7 @@ fn build_active_flow_info(
peers: &[Peer],
resolved_table_ids: &[TableId],
) -> (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) {
let flow_type = get_flow_type(current_flow_info);
let flownode_ids = peers
.iter()
.enumerate()
@@ -442,15 +465,25 @@ fn build_active_flow_info(
new_flow_info.flownode_ids = flownode_ids;
new_flow_info.status = FlowStatus::Active;
new_flow_info.last_activation_error = None;
new_flow_info.options.insert(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::Batching.to_string(),
);
new_flow_info
.options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
new_flow_info.updated_time = chrono::Utc::now();
(new_flow_info, flow_routes)
}
fn get_flow_type(flow_info: &FlowInfoValue) -> FlowType {
match flow_info
.options()
.get(FlowType::FLOW_TYPE_KEY)
.map(String::as_str)
{
Some(FlowType::STREAMING) => FlowType::Streaming,
_ => FlowType::Batching,
}
}
async fn invalidate_flow_cache(
context: &DdlContext,
flow_id: FlowId,

View File

@@ -438,7 +438,8 @@ pub enum CreateFlowState {
Prepare,
/// Creates flows on the flownode.
CreateFlows,
/// (Optional, only when replacing a old flow with a pending flow)Drops old flows on the flownode.
/// Optional state used when replacing an old flow with a pending flow.
/// Drops old flows on the flownode.
DropOldFlows,
/// Invalidate flow cache.
InvalidateFlowCache,

View File

@@ -22,6 +22,7 @@ use common_time::TimeToLive;
use table::table_name::TableName;
use crate::ddl::activate_flow::ActivatePendingFlowProcedure;
use crate::ddl::create_flow::FlowType;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::tests::create_flow::create_test_flow;
use crate::key::table_route::TableRouteValue;
@@ -256,3 +257,94 @@ async fn test_activate_pending_flow_uses_replace_semantics() {
assert!(req.or_replace);
}
}
#[tokio::test]
async fn test_activate_pending_flow_preserves_streaming_type() {
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_streaming_source_table",
)];
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_streaming_sink_table",
);
let handler = RecordingFlownodeHandler::default();
let node_manager = Arc::new(MockFlownodeManager::new(handler.clone()));
let ddl_context = new_ddl_context(node_manager);
let flow_id = create_test_flow(
&ddl_context,
"activate_streaming_pending_flow",
source_table_names,
sink_table_name,
)
.await;
let pending_flow_info = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await
.unwrap()
.unwrap();
let mut updated_flow_info = pending_flow_info.get_inner_ref().clone();
updated_flow_info.options.insert(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::Streaming.to_string(),
);
ddl_context
.flow_metadata_manager
.update_flow_metadata(flow_id, &pending_flow_info, &updated_flow_info, vec![])
.await
.unwrap();
let create_table_task = test_create_table_task("activate_streaming_source_table", 1028);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
Default::default(),
)
.await
.unwrap();
let mut procedure = ActivatePendingFlowProcedure::new(
flow_id,
DEFAULT_CATALOG_NAME.to_string(),
ddl_context.clone(),
);
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let activated_flow_id = output.downcast_ref::<u32>().unwrap();
assert_eq!(*activated_flow_id, flow_id);
let activated_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(activated_flow.is_active());
assert_eq!(
activated_flow
.options()
.get(FlowType::FLOW_TYPE_KEY)
.map(String::as_str),
Some(FlowType::STREAMING)
);
let create_requests = handler.create_requests.lock().unwrap();
assert!(!create_requests.is_empty());
for req in create_requests.iter() {
assert_eq!(
req.flow_options
.get(FlowType::FLOW_TYPE_KEY)
.map(String::as_str),
Some(FlowType::STREAMING)
);
}
}

View File

@@ -124,6 +124,10 @@ async fn test_create_flow_source_table_not_found() {
.unwrap();
assert!(flow_info.is_pending());
assert_eq!(flow_info.unresolved_source_table_names().len(), 1);
assert_eq!(
flow_info.unresolved_source_table_names()[0].table_name,
"my_table"
);
}
pub(crate) async fn create_test_flow(

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::key::table_name::TableNameKey;
use common_telemetry::{error, info};
use futures::TryStreamExt;
use snafu::ResultExt;
@@ -79,41 +80,84 @@ impl PendingFlowReconcileManager {
async fn handle_tick(&self) -> Result<()> {
let ddl_context = self.ddl_manager.create_context();
let flow_infos = ddl_context
let ddl_manager = self.ddl_manager.clone();
ddl_context
.flow_metadata_manager
.flow_info_manager()
.flow_infos()
.try_collect::<Vec<_>>()
.try_for_each(move |(flow_id, flow_info)| {
let ddl_context = ddl_context.clone();
let ddl_manager = ddl_manager.clone();
async move {
if !flow_info.is_pending() {
return Ok(());
}
let current_flow_info = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await;
let current_flow_info = match current_flow_info {
Ok(current_flow_info) => current_flow_info,
Err(e) => {
error!(e; "Failed to load flow metadata for pending flow {}", flow_id);
return Ok(());
}
};
let Some(current_flow_info) = current_flow_info else {
return Ok(());
};
if !current_flow_info.get_inner_ref().is_pending() {
return Ok(());
}
let unresolved_source_table_names = current_flow_info
.get_inner_ref()
.unresolved_source_table_names();
if !unresolved_source_table_names.is_empty() {
let unresolved_table_keys = unresolved_source_table_names
.iter()
.map(|name| {
TableNameKey::new(
&name.catalog_name,
&name.schema_name,
&name.table_name,
)
})
.collect::<Vec<_>>();
let resolved_tables = ddl_context
.table_metadata_manager
.table_name_manager()
.batch_get(unresolved_table_keys)
.await;
let resolved_tables = match resolved_tables {
Ok(resolved_tables) => resolved_tables,
Err(e) => {
error!(e; "Failed to resolve source tables for pending flow {}", flow_id);
return Ok(());
}
};
if resolved_tables.iter().all(|table_id| table_id.is_none()) {
return Ok(());
}
}
if let Err(e) = ddl_manager
.submit_activate_pending_flow_task(
flow_id,
current_flow_info.get_inner_ref().catalog_name().clone(),
)
.await
{
error!(e; "Failed to reconcile pending flow {}", flow_id);
}
Ok(())
}
})
.await
.context(PendingFlowReconcileSnafu)?;
let pending_flows = flow_infos
.into_iter()
.filter_map(|(flow_id, flow_info)| {
flow_info
.is_pending()
.then_some((flow_id, flow_info.catalog_name().clone()))
})
.collect::<Vec<_>>();
for (flow_id, catalog_name) in pending_flows {
let current_flow_info = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await
.context(PendingFlowReconcileSnafu)?;
let Some(current_flow_info) = current_flow_info else {
continue;
};
if !current_flow_info.get_inner_ref().is_pending() {
continue;
}
let _ = self
.ddl_manager
.submit_activate_pending_flow_task(flow_id, catalog_name)
.await
.context(PendingFlowReconcileSnafu)?;
}
Ok(())
}

View File

@@ -657,8 +657,10 @@ impl StatementExecutor {
}
/// Determine the flow type based on the SQL query
/// Determines the flow type from source-table state and SQL shape.
///
/// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow
/// Missing source tables keep the flow pending and default it to batching.
/// Otherwise, aggregation, distinct, and TQL queries are batching; the rest are streaming.
async fn determine_flow_type(
&self,
expr: &CreateFlowExpr,
@@ -700,6 +702,10 @@ impl StatementExecutor {
}
if has_missing_source_table {
info!(
"Flow `{}` defaults to batching because some source tables are not available yet",
expr.flow_name
);
return Ok(FlowType::Batching);
}

View File

@@ -32,3 +32,4 @@ tinytemplate = "1.2"
tokio.workspace = true
tokio-postgres = { workspace = true }
tokio-stream.workspace = true
toml.workspace = true

View File

@@ -441,27 +441,24 @@ pub fn get_workspace_root() -> String {
}
fn get_target_dir_from_cargo_config() -> Option<PathBuf> {
let workspace_root = get_workspace_root();
let output = Command::new("cargo")
.current_dir(&workspace_root)
.args([
"config",
"get",
"build.target-dir",
"-Z",
"unstable-options",
"--format",
"json-value",
])
.output()
.ok()?;
if !output.status.success() {
return None;
if let Ok(target_dir) = std::env::var("CARGO_TARGET_DIR") {
let target_dir = PathBuf::from(target_dir);
return if target_dir.is_absolute() {
Some(target_dir)
} else {
Some(PathBuf::from(get_workspace_root()).join(target_dir))
};
}
let target_dir = std::str::from_utf8(&output.stdout).ok()?.trim();
let target_dir: String = serde_json::from_str(target_dir).ok()?;
let workspace_root = get_workspace_root();
let cargo_config = PathBuf::from(&workspace_root).join(".cargo/config.toml");
let cargo_config = std::fs::read_to_string(cargo_config).ok()?;
let cargo_config: toml::Value = toml::from_str(&cargo_config).ok()?;
let target_dir = cargo_config
.get("build")?
.get("target-dir")?
.as_str()?
.to_string();
let target_dir = PathBuf::from(target_dir);
if target_dir.is_absolute() {