Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-23 13:03:25 +08:00
parent acee8a8ec5
commit 7ecf3eb35e
4 changed files with 72 additions and 22 deletions

View File

@@ -55,17 +55,6 @@ use crate::system_schema::utils;
const INIT_CAPACITY: usize = 42;
fn user_visible_flow_options(
options: &std::collections::HashMap<String, String>,
) -> sql::statements::OptionMap {
sql::statements::OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}
// rows of information_schema.flows
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
@@ -177,7 +166,10 @@ impl InformationSchemaFlows {
expire_after: flow_info.expire_after(),
eval_interval: flow_info.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_info.options()),
flow_options: sql::statements::OptionMap::from_filtered_string_map(
flow_info.options(),
&[FlowType::FLOW_TYPE_KEY],
),
query,
};

View File

@@ -1047,10 +1047,22 @@ pub fn to_create_flow_task_expr(
eval_interval: eval_interval.map(|seconds| api::v1::EvalInterval { seconds }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: create_flow.flow_options.into_map(),
flow_options: stringify_flow_options(create_flow.flow_options)?,
})
}
fn stringify_flow_options(flow_options: OptionMap) -> Result<HashMap<String, String>> {
let options_len = flow_options.len();
let flow_options = flow_options.into_map();
ensure!(
flow_options.len() == options_len,
InvalidSqlSnafu {
err_msg: "flow options only support scalar string-compatible values".to_string(),
}
);
Ok(flow_options)
}
/// sanitize the flow name, remove possible quotes
fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
ensure!(
@@ -1376,6 +1388,29 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
HashMap::from([("defer_on_missing_source".to_string(), "true".to_string(),)])
);
let sql = r"
CREATE FLOW task_5
SINK TO schema_1.table_1
WITH (defer_on_missing_source = [true])
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
assert!(res.is_err());
assert!(
res.unwrap_err()
.to_string()
.contains("flow options only support scalar string-compatible values")
);
let sql = r"
CREATE FLOW abc.`task_2`
SINK TO schema_1.table_1

View File

@@ -80,14 +80,6 @@ const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
fn user_visible_flow_options(options: &HashMap<String, String>) -> OptionMap {
OptionMap::from(
options
.iter()
.filter(|(key, _)| key.as_str() != FlowType::FLOW_TYPE_KEY)
.map(|(key, value)| (key.clone(), value.clone())),
)
}
const COLUMN_NAME_COLUMN: &str = "Column";
const COLUMN_GREPTIME_TYPE_COLUMN: &str = "Greptime_type";
const COLUMN_TYPE_COLUMN: &str = "Type";
@@ -1066,7 +1058,10 @@ pub fn show_create_flow(
expire_after: flow_val.expire_after(),
eval_interval: flow_val.eval_interval(),
comment,
flow_options: user_visible_flow_options(flow_val.options()),
flow_options: OptionMap::from_filtered_string_map(
flow_val.options(),
&[FlowType::FLOW_TYPE_KEY],
),
query,
};

View File

@@ -49,6 +49,18 @@ impl OptionMap {
}
}
pub fn from_filtered_string_map(
options: &HashMap<String, String>,
hidden_keys: &[&str],
) -> Self {
Self::from(
options
.iter()
.filter(|(key, _)| !hidden_keys.contains(&key.as_str()))
.map(|(key, value)| (key.clone(), value.clone())),
)
}
pub fn insert(&mut self, k: String, v: String) {
if REDACTED_OPTIONS.contains(&k.as_str()) {
self.secrets.insert(k, SecretString::new(Box::new(v)));
@@ -221,6 +233,8 @@ impl VisitMut for OptionMap {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::statements::OptionMap;
#[test]
@@ -237,4 +251,18 @@ mod tests {
map.insert("a.b".to_string(), "中文comment\n".to_string());
assert_eq!("'a.b' = '中文comment\\n'", map.kv_pairs()[0]);
}
#[test]
fn test_from_filtered_string_map() {
let map = OptionMap::from_filtered_string_map(
&HashMap::from([
("visible".to_string(), "1".to_string()),
("hidden".to_string(), "2".to_string()),
]),
&["hidden"],
);
assert_eq!(map.get("visible"), Some("1"));
assert_eq!(map.get("hidden"), None);
}
}