fix: unquote flow_name in create flow expr (#5483)

* fix: unquote flow_name in create flow expr

* chore: per review

* fix: compat with older version
This commit is contained in:
discord9
2025-02-07 16:26:14 +08:00
committed by GitHub
parent 88c3d331a1
commit f29a1c56e9
4 changed files with 108 additions and 11 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
@@ -37,6 +38,12 @@ lazy_static! {
"^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$"
))
.unwrap();
/// for compatibility with older flow name with less strict name pattern
static ref COMPAT_FLOW_NAME_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOW_NAME_KEY_PREFIX}/({NAME_PATTERN})/(.*)$"
))
.unwrap();
}
/// The key of mapping {flow_name} to [FlowId].
@@ -114,12 +121,18 @@ impl<'a> MetadataKey<'a, FlowNameKeyInner<'a>> for FlowNameKeyInner<'_> {
}
.build()
})?;
let captures =
FLOW_NAME_KEY_PATTERN
.captures(key)
.context(error::InvalidMetadataSnafu {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
let captures = FLOW_NAME_KEY_PATTERN
.captures(key)
.or_else(|| {
warn!(
"FlowNameKeyInner '{}' is not a valid flow name in newer version.",
key
);
COMPAT_FLOW_NAME_KEY_PATTERN.captures(key)
})
.context(error::InvalidMetadataSnafu {
err_msg: format!("Invalid FlowNameKeyInner '{key}'"),
})?;
// Safety: pass the regex check above
let catalog_name = captures.get(1).unwrap().as_str();
let flow_name = captures.get(2).unwrap().as_str();
@@ -284,6 +297,12 @@ mod tests {
let key = FlowNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");
// compatibility with older version
let bytes = b"__flow/name/my_catalog/a/`b`".to_vec();
let key = FlowNameKey::from_bytes(&bytes).unwrap();
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "a/`b`");
}
#[test]
fn test_key_start_range() {

View File

@@ -704,6 +704,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid flow name: {name}"))]
InvalidFlowName {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Empty {} expr", name))]
EmptyDdlExpr {
name: String,
@@ -821,6 +828,7 @@ impl ErrorExt for Error {
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::InvalidViewName { .. }
| Error::InvalidFlowName { .. }
| Error::InvalidView { .. }
| Error::InvalidExpr { .. }
| Error::AdminFunctionNotFound { .. }

View File

@@ -38,7 +38,7 @@ use query::sql::{
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::ColumnOption;
use sql::ast::{ColumnOption, ObjectName};
use sql::statements::alter::{
AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
};
@@ -55,8 +55,9 @@ use table::table_reference::TableReference;
use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, IllegalPrimaryKeysDefSnafu,
InferFileTableSchemaSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu,
PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu,
ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
UnrecognizedTableOptionSnafu,
};
#[derive(Debug, Copy, Clone)]
@@ -731,7 +732,7 @@ pub fn to_create_flow_task_expr(
Ok(CreateFlowExpr {
catalog_name: query_ctx.current_catalog().to_string(),
flow_name: create_flow.flow_name.to_string(),
flow_name: sanitize_flow_name(create_flow.flow_name)?,
source_table_names,
sink_table_name: Some(sink_table_name),
or_replace: create_flow.or_replace,
@@ -743,6 +744,18 @@ pub fn to_create_flow_task_expr(
})
}
/// sanitize the flow name, remove possible quotes
fn sanitize_flow_name(mut flow_name: ObjectName) -> Result<String> {
ensure!(
flow_name.0.len() == 1,
InvalidFlowNameSnafu {
name: flow_name.to_string(),
}
);
// safety: we've checked flow_name.0 has exactly one element.
Ok(flow_name.0.swap_remove(0).value)
}
#[cfg(test)]
mod tests {
use api::v1::{SetDatabaseOptions, UnsetDatabaseOptions};
@@ -755,6 +768,62 @@ mod tests {
use super::*;
#[test]
fn test_create_flow_expr() {
let sql = r"
CREATE FLOW `task_2`
SINK TO schema_1.table_1
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();
let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!("task_2", expr.flow_name);
assert_eq!("greptime", expr.catalog_name);
assert_eq!(
"greptime.schema_1.table_1",
expr.sink_table_name.map(to_dot_sep).unwrap()
);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.schema_2.table_2",
to_dot_sep(expr.source_table_names[0].clone())
);
assert_eq!("SELECT max(c1), min(c2) FROM schema_2.table_2", expr.sql);
let sql = r"
CREATE FLOW abc.`task_2`
SINK TO schema_1.table_1
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let res = to_create_flow_task_expr(create_flow, &QueryContext::arc());
assert!(res.is_err());
assert!(res
.unwrap_err()
.to_string()
.contains("Invalid flow name: abc.`task_2`"));
}
#[test]
fn test_create_to_expr() {
let sql = "CREATE TABLE monitor (host STRING,ts TIMESTAMP,TIME INDEX (ts),PRIMARY KEY(host)) ENGINE=mito WITH(ttl='3days', write_buffer_size='1024KB');";

View File

@@ -1306,7 +1306,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
// create flow without `OR REPLACE`, `IF NOT EXISTS`, `EXPIRE AFTER` and `COMMENT`
let sql = r"
CREATE FLOW task_2
CREATE FLOW `task_2`
SINK TO schema_1.table_1
AS
SELECT max(c1), min(c2) FROM schema_2.table_2;";
@@ -1322,6 +1322,7 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
assert!(!create_task.if_not_exists);
assert!(create_task.expire_after.is_none());
assert!(create_task.comment.is_none());
assert_eq!(create_task.flow_name.to_string(), "`task_2`");
}
#[test]