feat: validate after parse

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-14 17:24:53 +08:00
parent 75c0d026c7
commit acee8a8ec5
2 changed files with 168 additions and 7 deletions

View File

@@ -1047,7 +1047,7 @@ pub fn to_create_flow_task_expr(
eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: Default::default(),
flow_options: create_flow.flow_options.into_map(),
})
}
@@ -1065,6 +1065,8 @@ fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
@@ -1327,6 +1329,52 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
to_dot_sep(expr.source_table_names[0].clone())
);
assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
assert!(expr.flow_options.is_empty());
let sql = r"
CREATE FLOW task_3
SINK TO schema_1.table_1
WITH (defer_on_missing_source = 'true', foo = 'bar')
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([
("defer_on_missing_source".to_string(), "true".to_string()),
("foo".to_string(), "bar".to_string()),
])
);
let sql = r"
CREATE FLOW task_4
SINK TO schema_1.table_1
WITH (defer_on_missing_source = true)
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
assert_eq!(
expr.flow_options,
HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
);
let sql = r"
CREATE FLOW abc.`task_2`

View File

@@ -113,6 +113,9 @@ struct DdlSubmitOptions {
timeout: Duration,
}
const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";
const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY];
fn build_procedure_id_output(procedure_id: Vec<u8>) -> Result<Output> {
let procedure_id = String::from_utf8_lossy(&procedure_id).to_string();
let vector: VectorRef = Arc::new(StringVector::from(vec![procedure_id]));
@@ -152,6 +155,55 @@ fn parse_ddl_options(options: &OptionMap) -> Result<DdlSubmitOptions> {
Ok(DdlSubmitOptions { wait, timeout })
}
fn supported_flow_options() -> String {
ALLOWED_FLOW_OPTIONS.join(", ")
}
fn normalize_flow_bool_option(key: &str, value: &str) -> Result<String> {
value
.trim()
.to_ascii_lowercase()
.parse::<bool>()
.map(|value| value.to_string())
.map_err(|_| {
InvalidSqlSnafu {
err_msg: format!("invalid flow option '{key}': '{value}'"),
}
.build()
})
}
fn validate_and_normalize_flow_options(
options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
options
.into_iter()
.map(|(key, value)| {
if key == FlowType::FLOW_TYPE_KEY {
return InvalidSqlSnafu {
err_msg: format!("flow option '{key}' is reserved for internal use"),
}
.fail();
}
let normalized_value = match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY => normalize_flow_bool_option(&key, &value)?,
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"unknown flow option '{key}', supported options: {}",
supported_flow_options()
),
}
.fail();
}
};
Ok((key, normalized_value))
})
.collect()
}
impl StatementExecutor {
pub fn catalog_manager(&self) -> CatalogManagerRef {
self.catalog_manager.clone()
@@ -629,17 +681,16 @@ impl StatementExecutor {
expr: CreateFlowExpr,
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let mut expr = expr;
expr.flow_options = validate_and_normalize_flow_options(expr.flow_options)?;
let flow_type = self
.determine_flow_type(&expr, query_context.clone())
.await?;
info!("determined flow={} type: {:#?}", expr.flow_name, flow_type);
let expr = {
let mut expr = expr;
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
expr
};
expr.flow_options
.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string());
let task = CreateFlowTask::try_from(PbCreateFlowTask {
create_flow: Some(expr),
@@ -2334,6 +2385,68 @@ mod test {
assert_eq!(Duration::from_secs(300), ddl_options.timeout);
}
#[test]
fn test_validate_and_normalize_flow_options_empty() {
assert!(
validate_and_normalize_flow_options(HashMap::new())
.unwrap()
.is_empty()
);
}
#[test]
fn test_validate_and_normalize_flow_options_valid() {
let options =
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "TRUE".to_string())]);
assert_eq!(
validate_and_normalize_flow_options(options).unwrap(),
HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string(),)])
);
}
#[test]
fn test_validate_and_normalize_flow_options_unknown_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
"foo".to_string(),
"bar".to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("unknown flow option 'foo', supported options: defer_on_missing_source")
);
}
#[test]
fn test_validate_and_normalize_flow_options_reserved_option() {
let err = validate_and_normalize_flow_options(HashMap::from([(
FlowType::FLOW_TYPE_KEY.to_string(),
FlowType::BATCHING.to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("flow option 'flow_type' is reserved for internal use")
);
}
#[test]
fn test_validate_and_normalize_flow_options_invalid_bool() {
let err = validate_and_normalize_flow_options(HashMap::from([(
DEFER_ON_MISSING_SOURCE_KEY.to_string(),
"not-a-bool".to_string(),
)]))
.unwrap_err();
assert!(
err.to_string()
.contains("invalid flow option 'defer_on_missing_source': 'not-a-bool'")
);
}
#[test]
fn test_name_is_match() {
assert!(!NAME_PATTERN_REG.is_match("/adaf"));