From 367a25af06eadbc7649f82e740583646bc18baae Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 29 Aug 2025 20:16:03 +0800 Subject: [PATCH] feat: flow prom ql auto sink table is also promql-able (#6852) * feat: flow prom ql auto sink table is also promql-able Signed-off-by: discord9 * fix: gen create table expr without aggr/projection outermost Signed-off-by: discord9 * test: update non-aggr testcase Signed-off-by: discord9 --------- Signed-off-by: discord9 --- src/flow/src/batching_mode/frontend_client.rs | 10 +- src/flow/src/batching_mode/task.rs | 79 ++++++--- src/flow/src/batching_mode/utils.rs | 123 ++++++++------ src/flow/src/error.rs | 16 +- .../distributed/flow-tql/flow_tql.result | 151 ++++++++++++------ tests/cases/distributed/flow-tql/flow_tql.sql | 93 +---------- .../cases/standalone/flow-tql/flow_tql.result | 151 ++++++++++++------ tests/cases/standalone/flow-tql/flow_tql.sql | 30 +++- 8 files changed, 385 insertions(+), 268 deletions(-) mode change 100644 => 120000 tests/cases/distributed/flow-tql/flow_tql.sql diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index a91cbe433e..22ca444262 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -41,8 +41,8 @@ use snafu::{OptionExt, ResultExt}; use crate::batching_mode::BatchingModeOptions; use crate::error::{ - ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, - UnexpectedSnafu, + CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, + NoAvailableFrontendSnafu, UnexpectedSnafu, }; use crate::{Error, FlowAuthHeader}; @@ -290,13 +290,17 @@ impl FrontendClient { ) -> Result { self.handle( Request::Ddl(api::v1::DdlRequest { - expr: Some(api::v1::ddl_request::Expr::CreateTable(create)), + expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())), }), catalog, schema, &mut None, ) .await + .map_err(BoxedError::new) + .with_context(|_| CreateSinkTableSnafu { + create: create.clone(), + }) } /// Execute a SQL statement on the frontend. diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 4ed42a45fa..eee9eb001f 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -17,7 +17,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use api::v1::CreateTableExpr; -use arrow_schema::Fields; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_query::logical_plan::breakup_insert_plan; @@ -102,7 +101,7 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result Result { - let fields = plan.schema().fields(); - let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?; + let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?; let mut column_schemas = Vec::new(); - for field in fields { + for field in plan.schema().fields() { let name = field.name(); let ty = ConcreteDataType::from_arrow_type(field.data_type()); let col_schema = if first_time_stamp == Some(name.clone()) { @@ -744,15 +743,40 @@ fn create_table_with_expr( } else { ColumnSchema::new(name, ty, true) }; - column_schemas.push(col_schema); + + match query_type { + QueryType::Sql => { + column_schemas.push(col_schema); + } + QueryType::Tql => { + // if is val column, need to rename as val DOUBLE NULL + // if is tag column, need to cast type as STRING NULL + let is_tag_column = primary_keys.contains(name); + let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name); + if is_val_column { + let col_schema = + ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true); + column_schemas.push(col_schema); + } else if is_tag_column { + let col_schema = + ColumnSchema::new(name, ConcreteDataType::string_datatype(), true); + column_schemas.push(col_schema); + } else { + // time index column + column_schemas.push(col_schema); + } + } + } } - let update_at_schema = ColumnSchema::new( - AUTO_CREATED_UPDATE_AT_TS_COL, - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ); - column_schemas.push(update_at_schema); + if query_type == &QueryType::Sql { + let update_at_schema = ColumnSchema::new( + AUTO_CREATED_UPDATE_AT_TS_COL, + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ); + column_schemas.push(update_at_schema); + } let time_index = if let Some(time_index) = first_time_stamp { time_index @@ -793,8 +817,8 @@ fn create_table_with_expr( /// * `Vec` - other columns which are also in group by clause fn build_primary_key_constraint( plan: &LogicalPlan, - schema: &Fields, ) -> Result<(Option, Vec), Error> { + let fields = plan.schema().fields(); let mut pk_names = FindGroupByFinalName::default(); plan.visit(&mut pk_names) @@ -802,19 +826,23 @@ fn build_primary_key_constraint( context: format!("Can't find aggr expr in plan {plan:?}"), })?; - // if no group by clause, return empty + // if no group by clause, return empty with first timestamp column found in output schema let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default(); if pk_final_names.is_empty() { - return Ok((None, Vec::new())); + let first_ts_col = fields + .iter() + .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()) + .map(|f| f.name().clone()); + return Ok((first_ts_col, Vec::new())); } - let all_pk_cols: Vec<_> = schema + let all_pk_cols: Vec<_> = fields .iter() .filter(|f| pk_final_names.contains(f.name())) .map(|f| f.name().clone()) .collect(); // auto create table use first timestamp column in group by clause as time index - let first_time_stamp = schema + let first_time_stamp = fields .iter() .find(|f| { all_pk_cols.contains(&f.name().clone()) @@ -873,13 +901,13 @@ mod test { ColumnSchema::new( "ts", ConcreteDataType::timestamp_millisecond_datatype(), - true, - ), + false, + ) + .with_time_index(true), update_at_schema.clone(), - ts_placeholder_schema.clone(), ], primary_keys: vec![], - time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(), + time_index: "ts".to_string(), }, TestCase { sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(), @@ -946,6 +974,7 @@ mod test { "public".to_string(), tc.sink_table_name.clone(), ], + &QueryType::Sql, ) .unwrap(); // TODO(discord9): assert expr @@ -954,9 +983,9 @@ mod test { .iter() .map(|c| try_as_column_schema(c).unwrap()) .collect::>(); - assert_eq!(tc.column_schemas, column_schemas); - assert_eq!(tc.primary_keys, expr.primary_keys); - assert_eq!(tc.time_index, expr.time_index); + assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql); + assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql); + assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql); } } } diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 03bd3a467d..e0708336f1 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -258,56 +258,9 @@ impl AddAutoColumnRewriter { is_rewritten: false, } } -} - -impl TreeNodeRewriter for AddAutoColumnRewriter { - type Node = LogicalPlan; - fn f_down(&mut self, mut node: Self::Node) -> DfResult> { - if self.is_rewritten { - return Ok(Transformed::no(node)); - } - - // if is distinct all, wrap it in a projection - if let LogicalPlan::Distinct(Distinct::All(_)) = &node { - let mut exprs = vec![]; - - for field in node.schema().fields().iter() { - exprs.push(Expr::Column(datafusion::common::Column::new_unqualified( - field.name(), - ))); - } - - let projection = - LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?); - - node = projection; - } - // handle table_scan by wrap it in a projection - else if let LogicalPlan::TableScan(table_scan) = node { - let mut exprs = vec![]; - - for field in table_scan.projected_schema.fields().iter() { - exprs.push(Expr::Column(datafusion::common::Column::new( - Some(table_scan.table_name.clone()), - field.name(), - ))); - } - - let projection = LogicalPlan::Projection(Projection::try_new( - exprs, - Arc::new(LogicalPlan::TableScan(table_scan)), - )?); - - node = projection; - } - - // only do rewrite if found the outermost projection - let mut exprs = if let LogicalPlan::Projection(project) = &node { - project.expr.clone() - } else { - return Ok(Transformed::no(node)); - }; + /// modify the exprs in place so that it matches the schema and some auto columns are added + fn modify_project_exprs(&mut self, mut exprs: Vec) -> DfResult> { let all_names = self .schema .column_schemas() @@ -391,10 +344,76 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas() ))); } + Ok(exprs) + } +} - self.is_rewritten = true; - let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?; - Ok(Transformed::yes(new_plan)) +impl TreeNodeRewriter for AddAutoColumnRewriter { + type Node = LogicalPlan; + fn f_down(&mut self, mut node: Self::Node) -> DfResult> { + if self.is_rewritten { + return Ok(Transformed::no(node)); + } + + // if is distinct all, wrap it in a projection + if let LogicalPlan::Distinct(Distinct::All(_)) = &node { + let mut exprs = vec![]; + + for field in node.schema().fields().iter() { + exprs.push(Expr::Column(datafusion::common::Column::new_unqualified( + field.name(), + ))); + } + + let projection = + LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?); + + node = projection; + } + // handle table_scan by wrap it in a projection + else if let LogicalPlan::TableScan(table_scan) = node { + let mut exprs = vec![]; + + for field in table_scan.projected_schema.fields().iter() { + exprs.push(Expr::Column(datafusion::common::Column::new( + Some(table_scan.table_name.clone()), + field.name(), + ))); + } + + let projection = LogicalPlan::Projection(Projection::try_new( + exprs, + Arc::new(LogicalPlan::TableScan(table_scan)), + )?); + + node = projection; + } + + // only do rewrite if found the outermost projection + // if the outermost node is projection, can rewrite the exprs + // if not, wrap it in a projection + if let LogicalPlan::Projection(project) = &node { + let exprs = project.expr.clone(); + let exprs = self.modify_project_exprs(exprs)?; + + self.is_rewritten = true; + let new_plan = + node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?; + Ok(Transformed::yes(new_plan)) + } else { + // wrap the logical plan in a projection + let mut exprs = vec![]; + for field in node.schema().fields().iter() { + exprs.push(Expr::Column(datafusion::common::Column::new_unqualified( + field.name(), + ))); + } + let exprs = self.modify_project_exprs(exprs)?; + self.is_rewritten = true; + let new_plan = + LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?); + Ok(Transformed::yes(new_plan)) + } } /// We might add new columns, so we need to recompute the schema diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index ad1190eeeb..817cef6a9e 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; +use api::v1::CreateTableExpr; use arrow_schema::ArrowError; use common_error::ext::BoxedError; use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; @@ -60,6 +61,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Error encountered while creating sink table for flow: {create:?}"))] + CreateSinkTable { + create: CreateTableExpr, + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Time error"))] Time { source: common_time::error::Error, @@ -331,9 +340,10 @@ impl ErrorExt for Error { | Self::ListFlows { .. } => StatusCode::TableNotFound, Self::FlowNotFound { .. } => StatusCode::FlowNotFound, Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, - Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => { - StatusCode::EngineExecuteQuery - } + Self::CreateFlow { .. } + | Self::CreateSinkTable { .. } + | Self::Arrow { .. } + | Self::Time { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } | Self::SyncCheckTask { .. } | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected, diff --git a/tests/cases/distributed/flow-tql/flow_tql.result b/tests/cases/distributed/flow-tql/flow_tql.result index 5edbe5a547..1958a7effd 100644 --- a/tests/cases/distributed/flow-tql/flow_tql.result +++ b/tests/cases/distributed/flow-tql/flow_tql.result @@ -2,7 +2,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -15,21 +15,20 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" BIGINT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ INSERT INTO TABLE http_requests VALUES (now() - '17s'::interval, 'host1', 'idc1', 200), @@ -85,7 +84,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -115,21 +114,20 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" BIGINT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ INSERT INTO TABLE http_requests VALUES (0::Timestamp, 'host1', 'idc1', 200), @@ -160,20 +158,20 @@ ADMIN FLUSH_FLOW('calc_reqs'); | FLOW_FLUSHED | +-------------------------------+ -SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; -+--------------------------+---------------------+-------------+ -| count(http_requests.val) | ts | status_code | -+--------------------------+---------------------+-------------+ -| 3 | 1970-01-01T00:00:00 | 200 | -| 1 | 1970-01-01T00:00:00 | 401 | -| 1 | 1970-01-01T00:00:05 | 401 | -| 2 | 1970-01-01T00:00:05 | 404 | -| 1 | 1970-01-01T00:00:05 | 500 | -| 2 | 1970-01-01T00:00:10 | 200 | -| 2 | 1970-01-01T00:00:10 | 201 | -| 4 | 1970-01-01T00:00:15 | 500 | -+--------------------------+---------------------+-------------+ ++-----+---------------------+-------------+ +| val | ts | status_code | ++-----+---------------------+-------------+ +| 3.0 | 1970-01-01T00:00:00 | 200.0 | +| 1.0 | 1970-01-01T00:00:00 | 401.0 | +| 1.0 | 1970-01-01T00:00:05 | 401.0 | +| 2.0 | 1970-01-01T00:00:05 | 404.0 | +| 1.0 | 1970-01-01T00:00:05 | 500.0 | +| 2.0 | 1970-01-01T00:00:10 | 200.0 | +| 2.0 | 1970-01-01T00:00:10 | 201.0 | +| 4.0 | 1970-01-01T00:00:15 | 500.0 | ++-----+---------------------+-------------+ DROP FLOW calc_reqs; @@ -187,3 +185,66 @@ DROP TABLE cnt_reqs; Affected Rows: 0 +CREATE TABLE http_requests ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]); + +Affected Rows: 0 + +SHOW CREATE TABLE rate_reqs; + ++-----------+------------------------------------------+ +| Table | Create Table | ++-----------+------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+------------------------------------------+ + +INSERT INTO TABLE http_requests VALUES + (now() - '1m'::interval, 0), + (now() - '30s'::interval, 1), + (now(), 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_rate') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT count(*) > 0 FROM rate_reqs; + ++---------------------+ +| count(*) > Int64(0) | ++---------------------+ +| true | ++---------------------+ + +DROP FLOW calc_rate; + +Affected Rows: 0 + +DROP TABLE http_requests; + +Affected Rows: 0 + +DROP TABLE rate_reqs; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/flow-tql/flow_tql.sql b/tests/cases/distributed/flow-tql/flow_tql.sql deleted file mode 100644 index a160e97315..0000000000 --- a/tests/cases/distributed/flow-tql/flow_tql.sql +++ /dev/null @@ -1,92 +0,0 @@ -CREATE TABLE http_requests ( - ts timestamp(3) time index, - host STRING, - idc STRING, - val BIGINT, - PRIMARY KEY(host, idc), -); - -CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS -TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); - -SHOW CREATE TABLE cnt_reqs; - -INSERT INTO TABLE http_requests VALUES - (now() - '17s'::interval, 'host1', 'idc1', 200), - (now() - '17s'::interval, 'host2', 'idc1', 200), - (now() - '17s'::interval, 'host3', 'idc2', 200), - (now() - '17s'::interval, 'host4', 'idc2', 401), - (now() - '13s'::interval, 'host1', 'idc1', 404), - (now() - '13s'::interval, 'host2', 'idc1', 401), - (now() - '13s'::interval, 'host3', 'idc2', 404), - (now() - '13s'::interval, 'host4', 'idc2', 500), - (now() - '7s'::interval, 'host1', 'idc1', 200), - (now() - '7s'::interval, 'host2', 'idc1', 200), - (now() - '7s'::interval, 'host3', 'idc2', 201), - (now() - '7s'::interval, 'host4', 'idc2', 201), - (now() - '3s'::interval, 'host1', 'idc1', 500), - (now() - '3s'::interval, 'host2', 'idc1', 500), - (now() - '3s'::interval, 'host3', 'idc2', 500), - (now() - '3s'::interval, 'host4', 'idc2', 500); - --- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -ADMIN FLUSH_FLOW('calc_reqs'); - --- too much indeterminsticity in the test, so just check that the flow is running -SELECT count(*) > 0 FROM cnt_reqs; - -DROP FLOW calc_reqs; -DROP TABLE http_requests; -DROP TABLE cnt_reqs; - -CREATE TABLE http_requests ( - ts timestamp(3) time index, - host STRING, - idc STRING, - val BIGINT, - PRIMARY KEY(host, idc), -); - -CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS -TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); - --- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative")) --- so duplicate test into two -CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS -TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests); - -CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS -TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests); - - -CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS -TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests); - -SHOW CREATE TABLE cnt_reqs; - -INSERT INTO TABLE http_requests VALUES - (0::Timestamp, 'host1', 'idc1', 200), - (0::Timestamp, 'host2', 'idc1', 200), - (0::Timestamp, 'host3', 'idc2', 200), - (0::Timestamp, 'host4', 'idc2', 401), - (5000::Timestamp, 'host1', 'idc1', 404), - (5000::Timestamp, 'host2', 'idc1', 401), - (5000::Timestamp, 'host3', 'idc2', 404), - (5000::Timestamp, 'host4', 'idc2', 500), - (10000::Timestamp, 'host1', 'idc1', 200), - (10000::Timestamp, 'host2', 'idc1', 200), - (10000::Timestamp, 'host3', 'idc2', 201), - (10000::Timestamp, 'host4', 'idc2', 201), - (15000::Timestamp, 'host1', 'idc1', 500), - (15000::Timestamp, 'host2', 'idc1', 500), - (15000::Timestamp, 'host3', 'idc2', 500), - (15000::Timestamp, 'host4', 'idc2', 500); - --- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | -ADMIN FLUSH_FLOW('calc_reqs'); - -SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; - -DROP FLOW calc_reqs; -DROP TABLE http_requests; -DROP TABLE cnt_reqs; diff --git a/tests/cases/distributed/flow-tql/flow_tql.sql b/tests/cases/distributed/flow-tql/flow_tql.sql new file mode 120000 index 0000000000..285056fab1 --- /dev/null +++ b/tests/cases/distributed/flow-tql/flow_tql.sql @@ -0,0 +1 @@ +../../standalone/flow-tql/flow_tql.sql \ No newline at end of file diff --git a/tests/cases/standalone/flow-tql/flow_tql.result b/tests/cases/standalone/flow-tql/flow_tql.result index 6d20bbfcca..fcbfed8651 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.result +++ b/tests/cases/standalone/flow-tql/flow_tql.result @@ -2,7 +2,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -15,21 +15,20 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" BIGINT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ INSERT INTO TABLE http_requests VALUES (now() - '17s'::interval, 'host1', 'idc1', 200), @@ -85,7 +84,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -115,21 +114,20 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-------------------------------------------+ -| Table | Create Table | -+----------+-------------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "count(http_requests.val)" BIGINT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" BIGINT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-------------------------------------------+ ++----------+-----------------------------------------+ +| Table | Create Table | ++----------+-----------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "val" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-----------------------------------------+ INSERT INTO TABLE http_requests VALUES (0::Timestamp, 'host1', 'idc1', 200), @@ -160,20 +158,20 @@ ADMIN FLUSH_FLOW('calc_reqs'); | FLOW_FLUSHED | +-------------------------------+ -SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; -+--------------------------+---------------------+-------------+ -| count(http_requests.val) | ts | status_code | -+--------------------------+---------------------+-------------+ -| 3 | 1970-01-01T00:00:00 | 200 | -| 1 | 1970-01-01T00:00:00 | 401 | -| 1 | 1970-01-01T00:00:05 | 401 | -| 2 | 1970-01-01T00:00:05 | 404 | -| 1 | 1970-01-01T00:00:05 | 500 | -| 2 | 1970-01-01T00:00:10 | 200 | -| 2 | 1970-01-01T00:00:10 | 201 | -| 4 | 1970-01-01T00:00:15 | 500 | -+--------------------------+---------------------+-------------+ ++-----+---------------------+-------------+ +| val | ts | status_code | ++-----+---------------------+-------------+ +| 3.0 | 1970-01-01T00:00:00 | 200.0 | +| 1.0 | 1970-01-01T00:00:00 | 401.0 | +| 1.0 | 1970-01-01T00:00:05 | 401.0 | +| 2.0 | 1970-01-01T00:00:05 | 404.0 | +| 1.0 | 1970-01-01T00:00:05 | 500.0 | +| 2.0 | 1970-01-01T00:00:10 | 200.0 | +| 2.0 | 1970-01-01T00:00:10 | 201.0 | +| 4.0 | 1970-01-01T00:00:15 | 500.0 | ++-----+---------------------+-------------+ DROP FLOW calc_reqs; @@ -187,3 +185,66 @@ DROP TABLE cnt_reqs; Affected Rows: 0 +CREATE TABLE http_requests ( + ts timestamp(3) time index, + val DOUBLE, +); + +Affected Rows: 0 + +CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]); + +Affected Rows: 0 + +SHOW CREATE TABLE rate_reqs; + ++-----------+------------------------------------------+ +| Table | Create Table | ++-----------+------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+------------------------------------------+ + +INSERT INTO TABLE http_requests VALUES + (now() - '1m'::interval, 0), + (now() - '30s'::interval, 1), + (now(), 2); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_rate') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT count(*) > 0 FROM rate_reqs; + ++---------------------+ +| count(*) > Int64(0) | ++---------------------+ +| true | ++---------------------+ + +DROP FLOW calc_rate; + +Affected Rows: 0 + +DROP TABLE http_requests; + +Affected Rows: 0 + +DROP TABLE rate_reqs; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql.sql b/tests/cases/standalone/flow-tql/flow_tql.sql index a160e97315..c176cbe24b 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.sql +++ b/tests/cases/standalone/flow-tql/flow_tql.sql @@ -2,7 +2,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -43,7 +43,7 @@ CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, idc STRING, - val BIGINT, + val DOUBLE, PRIMARY KEY(host, idc), ); @@ -85,8 +85,32 @@ INSERT INTO TABLE http_requests VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_reqs'); -SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; DROP FLOW calc_reqs; DROP TABLE http_requests; DROP TABLE cnt_reqs; + +CREATE TABLE http_requests ( + ts timestamp(3) time index, + val DOUBLE, +); + +CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]); + +SHOW CREATE TABLE rate_reqs; + +INSERT INTO TABLE http_requests VALUES + (now() - '1m'::interval, 0), + (now() - '30s'::interval, 1), + (now(), 2); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + +SELECT count(*) > 0 FROM rate_reqs; + +DROP FLOW calc_rate; +DROP TABLE http_requests; +DROP TABLE rate_reqs;