diff --git a/flake.lock b/flake.lock index 0bc3e6f283..bec6e18d9a 100644 --- a/flake.lock +++ b/flake.lock @@ -8,11 +8,11 @@ "rust-analyzer-src": "rust-analyzer-src" }, "locked": { - "lastModified": 1765252472, - "narHash": "sha256-byMt/uMi7DJ8tRniFopDFZMO3leSjGp6GS4zWOFT+uQ=", + "lastModified": 1770794449, + "narHash": "sha256-1nFkhcZx9+Sdw5OXwJqp5TxvGncqRqLeK781v0XV3WI=", "owner": "nix-community", "repo": "fenix", - "rev": "8456b985f6652e3eef0632ee9992b439735c5544", + "rev": "b19d93fdf9761e6101f8cb5765d638bacebd9a1b", "type": "github" }, "original": { @@ -41,11 +41,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1764983851, - "narHash": "sha256-y7RPKl/jJ/KAP/VKLMghMgXTlvNIJMHKskl8/Uuar7o=", + "lastModified": 1770617025, + "narHash": "sha256-1jZvgZoAagZZB6NwGRv2T2ezPy+X6EFDsJm+YSlsvEs=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "d9bc5c7dceb30d8d6fafa10aeb6aa8a48c218454", + "rev": "2db38e08fdadcc0ce3232f7279bab59a15b94482", "type": "github" }, "original": { @@ -65,11 +65,11 @@ "rust-analyzer-src": { "flake": false, "locked": { - "lastModified": 1765120009, - "narHash": "sha256-nG76b87rkaDzibWbnB5bYDm6a52b78A+fpm+03pqYIw=", + "lastModified": 1770702974, + "narHash": "sha256-CbvWu72rpGHK5QynoXwuOnVzxX7njF2LYgk8wRSiAQ0=", "owner": "rust-lang", "repo": "rust-analyzer", - "rev": "5e3e9c4e61bba8a5e72134b9ffefbef8f531d008", + "rev": "07a594815f7c1d6e7e39f21ddeeedb75b21795f4", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index 5da5edef00..0b43f46c5f 100644 --- a/flake.nix +++ b/flake.nix @@ -51,6 +51,9 @@ buildInputs = buildInputs; NIX_HARDENING_ENABLE = ""; + LD_LIBRARY_PATH = with pkgs; lib.makeLibraryPath [ + stdenv.cc.cc.lib + ]; }; }); } diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 5a6c3c755e..a751716620 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -36,6 +36,11 @@ use datatypes::vectors::{ }; use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; +use sql::ast::Ident; +use sql::dialect::GreptimeDbDialect; +use sql::parser::ParserContext; +use sql::statements::create::{CreateFlow, SqlOrTql}; +use sql::statements::statement::Statement; use store_api::storage::{ScanRequest, TableId}; use crate::CatalogManager; @@ -121,6 +126,51 @@ impl InformationSchemaFlows { )) } + /// Generates the CREATE FLOW statement for the flow_definition column + pub(crate) fn generate_show_create_flow(flow_info: &FlowInfoValue) -> Result { + let mut parser_ctx = ParserContext::new(&GreptimeDbDialect {}, flow_info.raw_sql()) + .map_err(BoxedError::new) + .context(InternalSnafu)?; + + let query = parser_ctx + .parse_statement() + .map_err(BoxedError::new) + .context(InternalSnafu)?; + + let raw_query = match &query { + Statement::Tql(_) => flow_info.raw_sql().clone(), + _ => query.to_string(), + }; + + let query = Box::new( + SqlOrTql::try_from_statement(query, &raw_query) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + ); + + let comment = if flow_info.comment().is_empty() { + None + } else { + Some(flow_info.comment().clone()) + }; + + let stmt = CreateFlow { + flow_name: sql::ast::ObjectName::from(vec![Ident::new(flow_info.flow_name())]), + sink_table_name: sql::ast::ObjectName::from(vec![ + Ident::new(&flow_info.sink_table_name().schema_name), + Ident::new(&flow_info.sink_table_name().table_name), + ]), + or_replace: false, + if_not_exists: true, + expire_after: flow_info.expire_after(), + eval_interval: flow_info.eval_interval(), + comment, + query, + }; + + Ok(stmt.to_string()) + } + fn builder(&self) -> InformationSchemaFlowsBuilder { InformationSchemaFlowsBuilder::new( self.schema.clone(), @@ -291,7 +341,10 @@ impl InformationSchemaFlowsBuilder { .and_then(|state| state.state_size.get(&flow_id).map(|v| *v as u64)), ); self.table_catalogs.push(Some(flow_info.catalog_name())); - self.raw_sqls.push(Some(flow_info.raw_sql())); + self.raw_sqls + .push(Some(&InformationSchemaFlows::generate_show_create_flow( + &flow_info, + )?)); self.comments.push(Some(flow_info.comment())); self.expire_afters.push(flow_info.expire_after()); self.source_table_id_groups.push(Some( diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index dcff099e82..49e26c92ca 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -1051,7 +1051,10 @@ pub fn show_create_flow( let stmt = CreateFlow { flow_name, - sink_table_name: ObjectName::from(vec![Ident::new(&flow_val.sink_table_name().table_name)]), + sink_table_name: ObjectName::from(vec![ + Ident::new(&flow_val.sink_table_name().schema_name), + Ident::new(&flow_val.sink_table_name().table_name), + ]), // notice we don't want `OR REPLACE` and `IF NOT EXISTS` in same sql since it's unclear what to do // so we set `or_replace` to false. or_replace: false, diff --git a/tests/cases/standalone/common/comment.result b/tests/cases/standalone/common/comment.result index 9da9541977..e800beb5ee 100644 --- a/tests/cases/standalone/common/comment.result +++ b/tests/cases/standalone/common/comment.result @@ -196,7 +196,7 @@ SHOW CREATE FLOW flow_comment_test; | Flow | Create Flow | +-------------------+------------------------------------------------------+ | flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test | -| | SINK TO flow_sink_comment_test | +| | SINK TO public.flow_sink_comment_test | | | COMMENT 'flow level description' | | | AS SELECT desc_str, ts FROM flow_source_comment_test | +-------------------+------------------------------------------------------+ @@ -222,7 +222,7 @@ SHOW CREATE FLOW flow_comment_test; | Flow | Create Flow | +-------------------+------------------------------------------------------+ | flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test | -| | SINK TO flow_sink_comment_test | +| | SINK TO public.flow_sink_comment_test | | | AS SELECT desc_str, ts FROM flow_source_comment_test | +-------------------+------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/flow_aft_alter.result b/tests/cases/standalone/common/flow/flow_aft_alter.result index 9ee6080e47..6425f72da8 100644 --- a/tests/cases/standalone/common/flow/flow_aft_alter.result +++ b/tests/cases/standalone/common/flow/flow_aft_alter.result @@ -35,7 +35,7 @@ SHOW CREATE FLOW find_approx_rate; | Flow | Create Flow | +------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ | find_approx_rate | CREATE FLOW IF NOT EXISTS find_approx_rate | -| | SINK TO approx_rate | +| | SINK TO public.approx_rate | | | AS SELECT (max(byte) - min(byte)) / 30.0 AS rate, date_bin(INTERVAL '30 second', ts) AS time_window FROM bytes_log GROUP BY time_window | +------------------+-----------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result index 90d53b9598..f38fe48d4a 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.result +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -79,7 +79,7 @@ SHOW CREATE FLOW test_numbers_basic; | Flow | Create Flow | +--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ | test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | -| | SINK TO out_num_cnt_basic | +| | SINK TO public.out_num_cnt_basic | | | AS SELECT sum(number), date_bin(INTERVAL '1 second', ts, '2021-07-01 00:00:00') AS time_window FROM numbers_input_basic GROUP BY time_window | +--------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ @@ -158,7 +158,7 @@ SHOW CREATE FLOW test_numbers_basic; | Flow | Create Flow | +--------------------+---------------------------------------------------------------------------------------+ | test_numbers_basic | CREATE FLOW IF NOT EXISTS test_numbers_basic | -| | SINK TO out_num_cnt_basic | +| | SINK TO public.out_num_cnt_basic | | | AS SELECT sum(number) AS sumup, ts AS event_time FROM numbers_input_basic GROUP BY ts | +--------------------+---------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 7d6bc03cf5..2e53e943dc 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -35,17 +35,19 @@ SHOW CREATE FLOW filter_numbers_show; | Flow | Create Flow | +---------------------+------------------------------------------------------------+ | filter_numbers_show | CREATE FLOW IF NOT EXISTS filter_numbers_show | -| | SINK TO out_num_cnt_show | +| | SINK TO public.out_num_cnt_show | | | AS SELECT number FROM numbers_input_show WHERE number > 10 | +---------------------+------------------------------------------------------------+ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+---------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+---------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number FROM numbers_input_show where number > 10 | greptime.public.numbers_input_show | -+---------------------+---------------+---------------------------------------------------------+------------------------------------+ ++---------------------+---------------+------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number FROM numbers_input_show WHERE number > 10 | | ++---------------------+---------------+------------------------------------------------------------+------------------------------------+ SHOW FLOWS LIKE 'filter_numbers_show'; @@ -77,11 +79,13 @@ Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > 10 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- this one should error out -- (flow exists, replace, if not exists)=(true, false, false) @@ -91,11 +95,13 @@ Error: 8000(FlowAlreadyExists), Flow already exists: greptime.filter_numbers_sho SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > 10 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure it's not replaced in flownode INSERT INTO numbers_input_show VALUES (10, 0),(15, 1),(16, 2); @@ -128,11 +134,13 @@ Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > 10 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 10 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure it's not replaced in flownode INSERT INTO numbers_input_show VALUES (4,4),(5,4),(10, 3),(11, 4); @@ -166,11 +174,13 @@ Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > 3 | greptime.public.numbers_input_show | -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | | ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ -- makesure it's replaced in flownode INSERT INTO numbers_input_show VALUES (3, 1),(4, 2),(10, 3),(11, 4); @@ -206,11 +216,13 @@ Error: 1001(Unsupported), Unsupported operation Create flow with both `IF NOT EX SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > 3 | greptime.public.numbers_input_show | -+---------------------+---------------+------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > 3 | | ++---------------------+---------------+---------------------------------------------------------------+------------------------------------+ DROP FLOW filter_numbers_show; @@ -238,11 +250,13 @@ Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > -2 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -2 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ DROP FLOW filter_numbers_show; @@ -255,11 +269,13 @@ Affected Rows: 0 SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > -3 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ -- makesure after recover should be the same -- SQLNESS ARG restart=true @@ -274,11 +290,13 @@ SELECT 1; -- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| flow_name | table_catalog | flow_definition | source_table_names | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ -| filter_numbers_show | greptime | SELECT number, ts FROM numbers_input_show where number > -3 | greptime.public.numbers_input_show | -+---------------------+---------------+-------------------------------------------------------------+------------------------------------+ ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| flow_name | table_catalog | flow_definition | source_table_names | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ +| filter_numbers_show | greptime | CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| | | SINK TO public.out_num_cnt_show | | +| | | AS SELECT number, ts FROM numbers_input_show WHERE number > -3 | | ++---------------------+---------------+----------------------------------------------------------------+------------------------------------+ SELECT * FROM out_num_cnt_show; @@ -386,11 +404,13 @@ Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow infe SELECT flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -+---------------------------------------------------------------+------------------------------------+ -| flow_definition | source_table_names | -+---------------------------------------------------------------+------------------------------------+ -| SELECT number as n1 FROM numbers_input_show where number > 10 | greptime.public.numbers_input_show | -+---------------------------------------------------------------+------------------------------------+ ++------------------------------------------------------------------+------------------------------------+ +| flow_definition | source_table_names | ++------------------------------------------------------------------+------------------------------------+ +| CREATE FLOW IF NOT EXISTS filter_numbers_show | greptime.public.numbers_input_show | +| SINK TO public.out_num_cnt_show | | +| AS SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | | ++------------------------------------------------------------------+------------------------------------+ INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);