feat(flow): parse defer on miss src table (#7980)

* feat: parse create flow with

Signed-off-by: discord9 <discord9@163.com>

* feat: validate after parse

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* chore: sqlness

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-27 11:02:13 +08:00
committed by GitHub
parent 793545d8e6
commit d2d256909f
9 changed files with 427 additions and 8 deletions

View File

@@ -16,6 +16,7 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::FlowId;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_info::FlowInfoValue;
@@ -168,6 +169,10 @@ impl InformationSchemaFlows {
expire_after: flow_info.expire_after(),
eval_interval: flow_info.eval_interval(),
comment,
flow_options: sql::statements::OptionMap::from_filtered_string_map(
flow_info.options(),
&[FlowType::FLOW_TYPE_KEY],
),
query,
};

View File

@@ -1047,10 +1047,22 @@ pub fn to_create_flow_task_expr(
eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: Default::default(),
flow_options: stringify_flow_options(create_flow.flow_options)?,
})
}
fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
let options_len = flow_options.len();
let flow_options = flow_options.into_map();
ensure!(
flow_options.len() == options_len,
InvalidSqlSnafu {
err_msg: "flow options only support scalar string-compatible values".to_string(),
}
);
Ok(flow_options)
}
/// sanitize the flow name, remove possible quotes
fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
ensure!(
@@ -1065,6 +1077,8 @@ fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
@@ -1327,6 +1341,75 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
to_dot_sep(expr.source_table_names[0].clone())
);
assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
assert!(expr.flow_options.is_empty());
let sql = r"
CREATE FLOW task_3
SINK TO schema_1.table_1
WITH (defer_on_missing_source = 'true', foo = 'bar')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([
("defer_on_missing_source".to_string(), "true".to_string()),
("foo".to_string(), "bar".to_string()),
])
);
let sql = r"
CREATE FLOW task_4
SINK TO schema_1.table_1
WITH (defer_on_missing_source = true)
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
);
let sql = r"
CREATE FLOW task_5
SINK TO schema_1.table_1
WITH (defer_on_missing_source = [true])
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
assert!(res.is_err());
assert!(
res.unwrap_err()
.to_string()
.contains("flow options only support scalar string-compatible values")
);
let sql = r"
CREATE FLOW abc.`task_2`

View File

@@ -113,6 +113,9 @@ struct DdlSubmitOptions {
timeout: Duration,
}
const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY];
fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
@@ -152,6 +155,55 @@ fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
Ok(DdlSubmitOptions { wait, timeout })
}
fn supported_flow_options() -> String {
ALLOWED_FLOW_OPTIONS.join(", ")
}
fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
value
.trim()
.to_ascii_lowercase()
.parse::<bool>()
.map(|value| value.to_string())
.map_err(|_| {
InvalidSqlSnafu {
err_msg: format!("invalid flow option '{key}': '{value}'"),
}
.build()
})
}
fn validate_and_normalize_flow_options(
options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
options
.into_iter()
.map(|(key, value)| {
if key == FlowType::FLOW_TYPE_KEY {
return InvalidSqlSnafu {
err_msg: format!("flow option '{key}' is reserved for internal use"),
}
.fail();
}
let normalized_value = match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"unknown flow option '{key}', supported options: {}",
supported_flow_options()
),
}
.fail();
}
};
Ok((key, normalized_value))
})
.collect()
}
impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
@@ -626,20 +678,18 @@ impl StatementExecutor {
async fn create_flow_procedure(
&self,
expr: CreateFlowExpr,
mut expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
let flow_type = self
.determine_flow_type(&expr, query_context.clone())
.await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
let expr = {
let mut expr = expr;
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
expr
};
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr),
@@ -2334,6 +2384,94 @@ mod test {
assert_eq!(Duration::from_secs(300), ddl_options.timeout);
}
#[test]
fn test_validate_and_normalize_flow_options_empty() {
assert!(
validate_and_normalize_flow_options(HashMap::new())
.unwrap()
.is_empty()
);
}
#[test]
fn test_validate_and_normalize_flow_options_valid() {
let options =
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]);
assert_eq!(
validate_and_normalize_flow_options(options).unwrap(),
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)])
);
}
#[test]
fn test_validate_and_normalize_flow_options_unknown_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
"foo".to_string(),
"bar".to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("unknown flow option 'foo', supported options: defer_on_missing_source")
);
}
#[test]
fn test_validate_and_normalize_flow_options_reserved_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::BATCHING.to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("flow option 'flow_type' is reserved for internal use")
);
}
#[test]
fn test_validate_and_normalize_flow_options_invalid_bool() {
let err = validate_and_normalize_flow_options(HashMap::from([(
DEFER_ON_MISSING_SOURCE_KEY.to_string(),
"not-a-bool".to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
);
}
#[test]
fn test_validate_and_normalize_flow_options_rejects_redacted_invalid_input() {
let sql = r"
CREATE FLOW task_6
SINK TO schema_1.table_1
WITH (access_key_id = [true])
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr =
expr_helper::to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let err = validate_and_normalize_flow_options(expr.flow_options).unwrap_err();
assert!(err.to_string().contains(
"unknown flow option 'access_key_id', supported options: defer_on_missing_source"
));
}
#[test]
fn test_name_is_match() {
assert!(!NAME_PATTERN_REG.is_match("/adaf"));

View File

@@ -33,6 +33,7 @@ use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
use common_meta::SchemaOptions;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_query::Output;
use common_query::prelude::greptime_timestamp;
@@ -78,6 +79,7 @@ const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
const COLUMN_NAME_COLUMN: &str = "Column";
const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
const COLUMN_TYPE_COLUMN: &str = "Type";
@@ -1056,6 +1058,10 @@ pub fn show_create_flow(
expire_after: flow_val.expire_after(),
eval_interval: flow_val.eval_interval(),
comment,
flow_options: OptionMap::from_filtered_string_map(
flow_val.options(),
&[FlowType::FLOW_TYPE_KEY],
),
query,
};

View File

@@ -68,6 +68,17 @@ pub const VECTOR: &str = "VECTOR";
pub type RawIntervalExpr = String;
// Preserve raw CREATE FLOW option entries until operator-side validation.
// Do not use `OptionMap::new()` here: it can drop non-string values for
// redacted keys before the flow option allowlist rejects them.
fn flow_option_map(options: HashMap<String, OptionValue>) -> OptionMap {
let mut flow_options = OptionMap::default();
for (key, value) in options {
flow_options.insert_options(&key, value);
}
flow_options
}
/// Parses create [table] statement
impl<'a> ParserContext<'a> {
pub(crate) fn parse_create(&mut self) -> Result<Statement> {
@@ -339,6 +350,14 @@ impl<'a> ParserContext<'a> {
None
};
let flow_options = self
.parser
.parse_options(Keyword::WITH)
.context(SyntaxSnafu)?
.into_iter()
.map(parse_option_string)
.collect::<Result<HashMap<String, OptionValue>>>()?;
self.parser
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;
@@ -353,6 +372,7 @@ impl<'a> ParserContext<'a> {
expire_after,
eval_interval,
comment,
flow_options: flow_option_map(flow_options),
query,
}))
}
@@ -1256,6 +1276,20 @@ mod tests {
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParseOptions;
fn string_option_map(
entries: impl IntoIterator<Item = (&'static str, &'static str)>,
) -> OptionMap {
OptionMap::new(entries.into_iter().map(|(key, value)| {
(
key.to_string(),
OptionValue::try_new(Expr::Value(
Value::SingleQuotedString(value.to_string()).into(),
))
.unwrap(),
)
}))
}
#[test]
fn test_parse_create_table_like() {
let sql = "CREATE TABLE t1 LIKE t2";
@@ -1498,6 +1532,8 @@ mod tests {
pub expire_after: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options
pub flow_options: OptionMap,
}
let testcases = vec![
(
@@ -1518,6 +1554,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1538,6 +1575,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1558,6 +1596,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1578,6 +1617,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: true,
expire_after: Some(300),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1597,6 +1637,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
if_not_exists: false,
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
comment: None,
flow_options: OptionMap::default(),
},
),
(
@@ -1616,6 +1657,7 @@ select max(c1), min(c2) from schema_2.table_2;",
if_not_exists: false,
expire_after: Some(600), // 10 minutes in seconds
comment: None,
flow_options: OptionMap::default(),
},
),
(
@@ -1636,6 +1678,27 @@ select max(c1), min(c2) from schema_2.table_2;",
if_not_exists: true,
expire_after: Some(7200), // 2 hours in seconds
comment: Some("lowercase test".to_string()),
flow_options: OptionMap::default(),
},
),
(
r"
CREATE FLOW task_5
SINK TO schema_1.table_1
WITH (defer_on_missing_source = 'true')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName::from(vec![Ident::new("task_5")]),
sink_table_name: ObjectName::from(vec![
Ident::new("schema_1"),
Ident::new("table_1"),
]),
or_replace: false,
if_not_exists: false,
expire_after: None,
comment: None,
flow_options: string_option_map([("defer_on_missing_source", "true")]),
},
),
];
@@ -1651,6 +1714,7 @@ select max(c1), min(c2) from schema_2.table_2;",
expire_after: expected.expire_after,
eval_interval: None,
comment: expected.comment,
flow_options: expected.flow_options,
// ignore query parse result
query: create_task.query.clone(),
};
@@ -1696,6 +1760,8 @@ select max(c1), min(c2) from schema_2.table_2;",
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options
pub flow_options: OptionMap,
}
// create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
@@ -1719,6 +1785,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1740,6 +1807,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: None,
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1762,6 +1830,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1784,6 +1853,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(300),
eval_interval: Some(10),
comment: Some("test comment".to_string()),
flow_options: OptionMap::default(),
},
),
(
@@ -1806,6 +1876,32 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: Some(2 * 86400 + 3600 + 2 * 60),
eval_interval: None,
comment: None,
flow_options: OptionMap::default(),
},
),
(
r"
CREATE FLOW task_3
SINK TO schema_1.table_1
EVAL INTERVAL '10 seconds'
WITH (defer_on_missing_source = 'true', foo = 'bar')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;",
CreateFlowWoutQuery {
flow_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("task_3"))]),
sink_table_name: ObjectName(vec![
ObjectNamePart::Identifier(Ident::new("schema_1")),
ObjectNamePart::Identifier(Ident::new("table_1")),
]),
or_replace: false,
if_not_exists: false,
expire_after: None,
eval_interval: Some(10),
comment: None,
flow_options: string_option_map([
("defer_on_missing_source", "true"),
("foo", "bar"),
]),
},
),
];
@@ -1821,6 +1917,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
expire_after: expected.expire_after,
eval_interval: expected.eval_interval,
comment: expected.comment,
flow_options: expected.flow_options,
// ignore query parse result
query: create_task.query.clone(),
};

View File

@@ -615,6 +615,8 @@ pub struct CreateFlow {
pub eval_interval: Option<i64>,
/// Comment string
pub comment: Option<String>,
/// Flow creation options from `WITH (...)`
pub flow_options: OptionMap,
/// SQL statement
pub query: Box<SqlOrTql>,
}
@@ -672,6 +674,10 @@ impl Display for CreateFlow {
if let Some(comment) = &self.comment {
writeln!(f, "COMMENT '{}'", comment)?;
}
if !self.flow_options.is_empty() {
let options = self.flow_options.kv_pairs();
writeln!(f, "WITH ({})", format_list_comma!(options))?;
}
write!(f, "AS {}", &self.query)
}
}

View File

@@ -49,6 +49,18 @@ impl OptionMap {
}
}
pub fn from_filtered_string_map(
options: &HashMap<String, String>,
hidden_keys: &[&str],
) -> Self {
Self::from(
options
.iter()
.filter(|(key, _)| !hidden_keys.contains(&key.as_str()))
.map(|(key, value)| (key.clone(), value.clone())),
)
}
pub fn insert(&mut self, k: String, v: String) {
if REDACTED_OPTIONS.contains(&k.as_str()) {
self.secrets.insert(k, SecretString::new(Box::new(v)));
@@ -221,6 +233,8 @@ impl VisitMut for OptionMap {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::statements::OptionMap;
#[test]
@@ -237,4 +251,18 @@ mod tests {
map.insert("a.b".to_string(), "中文comment\n".to_string());
assert_eq!("'a.b' = '中文comment\\n'", map.kv_pairs()[0]);
}
#[test]
fn test_from_filtered_string_map() {
let map = OptionMap::from_filtered_string_map(
&HashMap::from([
("visible".to_string(), "1".to_string()),
("hidden".to_string(), "2".to_string()),
]),
&["hidden"],
);
assert_eq!(map.get("visible"), Some("1"));
assert_eq!(map.get("hidden"), None);
}
}

View File

@@ -442,6 +442,46 @@ DROP FLOW filter_numbers_show;
Affected Rows: 0
CREATE FLOW filter_numbers_show
SINK TO out_num_cnt_show
WITH (defer_on_missing_source = true)
AS SELECT number AS n1 FROM numbers_input_show where number > 10;
Affected Rows: 0
SHOW CREATE FLOW filter_numbers_show;
+---------------------+------------------------------------------------------------------+
| Flow | Create Flow |
+---------------------+------------------------------------------------------------------+
| filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show |
| | SINK TO public.out_num_cnt_show |
| | WITH (defer_on_missing_source = 'true') |
| | AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 |
+---------------------+------------------------------------------------------------------+
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
+------------------------------------------------------------------+
| flow_definition |
+------------------------------------------------------------------+
| CREATE FLOW IF NOT EXISTS filter_numbers_show |
| SINK TO public.out_num_cnt_show |
| WITH (defer_on_missing_source = 'true') |
| AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 |
+------------------------------------------------------------------+
CREATE FLOW invalid_flow_option_show
SINK TO out_num_cnt_show
WITH (access_key_id = [true])
AS SELECT number AS n1 FROM numbers_input_show where number > 10;
Error: 1004(InvalidArguments), Invalid SQL, error: unknown flow option 'access_key_id', supported options: defer_on_missing_source
DROP FLOW filter_numbers_show;
Affected Rows: 0
drop table out_num_cnt_show;
Affected Rows: 0

View File

@@ -168,6 +168,22 @@ SELECT number FROM out_num_cnt_show;
DROP FLOW filter_numbers_show;
CREATE FLOW filter_numbers_show
SINK TO out_num_cnt_show
WITH (defer_on_missing_source = true)
AS SELECT number AS n1 FROM numbers_input_show where number > 10;
SHOW CREATE FLOW filter_numbers_show;
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
CREATE FLOW invalid_flow_option_show
SINK TO out_num_cnt_show
WITH (access_key_id = [true])
AS SELECT number AS n1 FROM numbers_input_show where number > 10;
DROP FLOW filter_numbers_show;
drop table out_num_cnt_show;
drop table numbers_input_show;