feat: parse create flow with

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-14 15:21:19 +08:00
parent 1b2480611b
commit d290bc58ad
5 changed files with 129 additions and 14 deletions

View File

@@ -16,6 +16,7 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::FlowId;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowInfoValue;
@@ -54,6 +55,17 @@ use crate::system_schema::utils;
const INIT_CAPACITY: usize = 42;
fn user_visible_flow_options(
options: &std::collections::HashMap<String, String>,
) -> sql::statements::OptionMap {
sql::statements::OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}
// rows of information_schema.flows
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
@@ -165,6 +177,7 @@ impl InformationSchemaFlows {
expire_after: flow_info.expire_after(),
eval_interval: flow_info.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_info.options()),
query,
};

View File

@@ -151,21 +151,20 @@ impl ActivatePendingFlowProcedure {
return Ok(Status::done());
}
if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching {
if let Some(reason) =
if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching
&& let Some(reason) =
validate_batching_activation(&self.context, &resolution.resolved_table_ids).await?
{
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
vec![],
Some(reason),
)
.await?;
return Ok(Status::done());
}
{
update_pending_flow_metadata(
&self.context,
self.data.flow_id,
&current_flow_info,
resolution.resolved_table_ids,
vec![],
Some(reason),
)
.await?;
return Ok(Status::done());
}
self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?;

View File

@@ -33,6 +33,7 @@ use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::SchemaOptions;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_query::prelude::greptime_timestamp;
@@ -78,6 +79,15 @@ const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
fn user_visible_flow_options(options: &HashMap<String, String>) -> OptionMap {
OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}
const COLUMN_NAME_COLUMN: &str = "Column";
const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
const COLUMN_TYPE_COLUMN: &str = "Type";
@@ -1062,6 +1072,7 @@ pub fn show_create_flow(
expire_after: flow_val.expire_after(),
eval_interval: flow_val.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_val.options()),
query,
};

View File

@@ -339,6 +339,14 @@ impl<'a> ParserContext<'a> {
None
};
let flow_options = self
.parser
.parse_options(Keyword::WITH)
.context(SyntaxSnafu)?
.into_iter()
.map(parse_option_string)
.collect::<Result<HashMap<String, OptionValue>>>()?;
self.parser
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
@@ -353,6 +361,7 @@ impl<'a> ParserContext<'a> {
expire_after,
eval_interval,
comment,
flow_options: OptionMap::new(flow_options),
query,
}))
}
@@ -1256,6 +1265,20 @@ mod tests {
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParseOptions;
fn string_option_map(
entries: impl IntoIterator<Item = (&'static str, &'static str)>,
) -> OptionMap {
OptionMap::new(entries.into_iter().map(|(key, value)| {
(
key.to_string(),
OptionValue::try_new(Expr::Value(
Value::SingleQuotedString(value.to_string()).into(),
))
.unwrap(),
)
}))
}
#[test]
fn test_parse_create_table_like() {
let sql = "CREATE TABLE t1 LIKE t2";
@@ -1498,6 +1521,8 @@ mod tests {
pub expire_after: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options
pub flow_options: OptionMap,
}
let testcases = vec![
(
@@ -1518,6 +1543,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1538,6 +1564,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1558,6 +1585,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1578,6 +1606,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1597,6 +1626,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: false,
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
comment: None,
flow_options: OptionMap::default(),
},
),
(
@@ -1616,6 +1646,7 @@ select max(c1), min(c2) from schema_2.table_2;",
if_not_exists: false,
expire_after: Some(600), // 10 minutes in seconds
comment: None,
flow_options: OptionMap::default(),
},
),
(
@@ -1636,6 +1667,27 @@ select max(c1), min(c2) from schema_2.table_2;",
if_not_exists: true,
expire_after: Some(7200), // 2 hours in seconds
comment: Some("lowercase test".to_string()),
flow_options: OptionMap::default(),
},
),
(
r"
CREATE FLOW task_5
SINK TO schema_1.table_1
WITH (defer_on_missing_source = 'true')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName::from(vec![Ident::new("task_5")]),
sink_table_name: ObjectName::from(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: false,
if_not_exists: false,
expire_after: None,
comment: None,
flow_options: string_option_map([("defer_on_missing_source", "true")]),
},
),
];
@@ -1651,6 +1703,7 @@ select max(c1), min(c2) from schema_2.table_2;",
expire_after: expected.expire_after,
eval_interval: None,
comment: expected.comment,
flow_options: expected.flow_options,
// ignore query parse result
query: create_task.query.clone(),
};
@@ -1696,6 +1749,8 @@ select max(c1), min(c2) from schema_2.table_2;",
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options
pub flow_options: OptionMap,
}
// create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
@@ -1719,6 +1774,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1740,6 +1796,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1762,6 +1819,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1784,6 +1842,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1806,6 +1865,32 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
eval_interval: None,
comment: None,
flow_options: OptionMap::default(),
},
),
(
r"
CREATE FLOW task_3
SINK TO schema_1.table_1
EVAL INTERVAL '10 seconds'
WITH (defer_on_missing_source = 'true', foo = 'bar')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: false,
if_not_exists: false,
expire_after: None,
eval_interval: Some(10),
comment: None,
flow_options: string_option_map([
("defer_on_missing_source", "true"),
("foo", "bar"),
]),
},
),
];
@@ -1821,6 +1906,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: expected.expire_after,
eval_interval: expected.eval_interval,
comment: expected.comment,
flow_options: expected.flow_options,
// ignore query parse result
query: create_task.query.clone(),
};

View File

@@ -612,6 +612,8 @@ pub struct CreateFlow {
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options from `WITH (...)`
pub flow_options: OptionMap,
/// SQL statement
pub query: Box<SqlOrTql>,
}
@@ -669,6 +671,10 @@ impl Display for CreateFlow {
if let Some(comment) = &self.comment {
writeln!(f, "COMMENT '{}'", comment)?;
}
if !self.flow_options.is_empty() {
let options = self.flow_options.kv_pairs();
writeln!(f, "WITH ({})", format_list_comma!(options))?;
}
write!(f, "AS {}", &self.query)
}
}