fix: use full DDL of flow in information_schema.flows.flow_definition (#7704)

* fix: use full DDL of flow in information_schema.flows.flow_definition

* fix: add schema name in sink table
This commit is contained in:
Ning Sun
2026-02-12 08:09:40 +08:00
committed by GitHub
parent 77013d9085
commit bcfbd01582
8 changed files with 146 additions and 67 deletions

View File

@@ -36,6 +36,11 @@ use datatypes::vectors::{
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use sql::ast::Ident;
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, SqlOrTql};
use sql::statements::statement::Statement;
use store_api::storage::{ScanRequest, TableId};
use crate::CatalogManager;
@@ -121,6 +126,51 @@ impl InformationSchemaFlows {
))
}
/// Generates the CREATE FLOW statement for the flow_definition column
pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result<String> {
let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql())
.map_err(BoxedError::new)
.context(InternalSnafu)?;
let query = parser_ctx
.parse_statement()
.map_err(BoxedError::new)
.context(InternalSnafu)?;
let raw_query = match &query {
Statement::Tql(_) => flow_info.raw_sql().clone(),
_ => query.to_string(),
};
let query = Box::new(
SqlOrTql::try_from_statement(query, &raw_query)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
);
let comment = if flow_info.comment().is_empty() {
None
} else {
Some(flow_info.comment().clone())
};
let stmt = CreateFlow {
flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]),
sink_table_name: sql::ast::ObjectName::from(vec![
Ident::new(&flow_info.sink_table_name().schema_name),
Ident::new(&flow_info.sink_table_name().table_name),
]),
or_replace: false,
if_not_exists: true,
expire_after: flow_info.expire_after(),
eval_interval: flow_info.eval_interval(),
comment,
query,
};
Ok(stmt.to_string())
}
fn builder(&self) -> InformationSchemaFlowsBuilder {
InformationSchemaFlowsBuilder::new(
self.schema.clone(),
@@ -291,7 +341,10 @@ impl InformationSchemaFlowsBuilder {
.and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)),
);
self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql()));
self.raw_sqls
.push(Some(&InformationSchemaFlows::generate_show_create_flow(
&flow_info,
)?));
self.comments.push(Some(flow_info.comment()));
self.expire_afters.push(flow_info.expire_after());
self.source_table_id_groups.push(Some(

View File

@@ -1051,7 +1051,10 @@ pub fn show_create_flow(
let stmt = CreateFlow {
flow_name,
sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]),
sink_table_name: ObjectName::from(vec![
Ident::new(&flow_val.sink_table_name().schema_name),
Ident::new(&flow_val.sink_table_name().table_name),
]),
// notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do
// so we set `or_replace` to false.
or_replace: false,