From 6ee91f6af407ebf335d4b6b72f559b340b871827 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 2 Sep 2025 16:54:16 +0800 Subject: [PATCH] fix(flow): promql auto create table (#6867) * fix: non aggr prom ql auto create table Signed-off-by: discord9 * feat: val column use any name Signed-off-by: discord9 * feat: check if it's tql src table Signed-off-by: discord9 * test: check sink table is tql-able Signed-off-by: discord9 * test: sqlness redacted Signed-off-by: discord9 * fix: sql also handle no aggr case Signed-off-by: discord9 --------- Signed-off-by: discord9 Signed-off-by: WenyXu --- src/flow/src/batching_mode/engine.rs | 148 ++++++++++- src/flow/src/batching_mode/task.rs | 90 ++++++- .../distributed/flow-tql/flow_tql.result | 242 ++++++++++++++---- .../cases/standalone/flow-tql/flow_tql.result | 242 ++++++++++++++---- tests/cases/standalone/flow-tql/flow_tql.sql | 68 ++++- 5 files changed, 668 insertions(+), 122 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 7dc6b4c1bc..b2b9012f54 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -29,10 +29,15 @@ use common_runtime::JoinHandle; use common_telemetry::tracing::warn; use common_telemetry::{debug, info}; use common_time::TimeToLive; +use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_expr::LogicalPlan; +use datatypes::prelude::ConcreteDataType; use query::QueryEngineRef; +use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use sql::parsers::utils::is_tql; use store_api::storage::{RegionId, TableId}; +use table::table_reference::TableReference; use tokio::sync::{oneshot, RwLock}; use crate::batching_mode::frontend_client::FrontendClient; @@ -42,8 +47,8 @@ use crate::batching_mode::utils::sql_to_df_plan; use crate::batching_mode::BatchingModeOptions; use crate::engine::FlowEngine; use crate::error::{ - CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu, - TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, + CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, + InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW; use crate::{CreateFlowArgs, Error, FlowId, TableName}; @@ -370,13 +375,12 @@ impl BatchingEngine { } })?; let query_ctx = Arc::new(query_ctx); + let is_tql = is_tql(query_ctx.sql_dialect(), &sql) + .map_err(BoxedError::new) + .context(CreateFlowSnafu { sql: &sql })?; // optionally set a eval interval for the flow - if eval_interval.is_none() - && is_tql(query_ctx.sql_dialect(), &sql) - .map_err(BoxedError::new) - .context(CreateFlowSnafu { sql: &sql })? - { + if eval_interval.is_none() && is_tql { InvalidQuerySnafu { reason: "TQL query requires EVAL INTERVAL to be set".to_string(), } @@ -418,6 +422,11 @@ impl BatchingEngine { let (tx, rx) = oneshot::channel(); let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?; + + if is_tql { + self.check_is_tql_table(&plan, &query_ctx).await?; + } + let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( &plan, self.query_engine.engine_state().catalog_manager().clone(), @@ -484,6 +493,131 @@ impl BatchingEngine { Ok(Some(flow_id)) } + async fn check_is_tql_table( + &self, + query: &LogicalPlan, + query_ctx: &QueryContext, + ) -> Result<(), Error> { + struct CollectTableRef { + table_refs: HashSet, + } + + impl TreeNodeVisitor<'_> for CollectTableRef { + type Node = LogicalPlan; + fn f_down( + &mut self, + node: &Self::Node, + ) -> datafusion_common::Result { + if let LogicalPlan::TableScan(scan) = node { + self.table_refs.insert(scan.table_name.clone()); + } + Ok(TreeNodeRecursion::Continue) + } + } + let mut table_refs = CollectTableRef { + table_refs: HashSet::new(), + }; + query + .visit_with_subqueries(&mut table_refs) + .context(DatafusionSnafu { + context: "Checking if all source tables are TQL tables", + })?; + + let default_catalog = query_ctx.current_catalog(); + let default_schema = query_ctx.current_schema(); + let default_schema = &default_schema; + + for table_ref in table_refs.table_refs { + let table_ref = match &table_ref { + datafusion_common::TableReference::Bare { table } => { + TableReference::full(default_catalog, default_schema, table) + } + datafusion_common::TableReference::Partial { schema, table } => { + TableReference::full(default_catalog, schema, table) + } + datafusion_common::TableReference::Full { + catalog, + schema, + table, + } => TableReference::full(catalog, schema, table), + }; + + let table_id = self + .table_meta + .table_name_manager() + .get(table_ref.into()) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| UnexpectedSnafu { + reason: format!("Failed to get table id for table: {}", table_ref), + })? + .table_id(); + let table_info = + get_table_info(self.table_meta.table_info_manager(), &table_id).await?; + // first check if it's only one f64 value column + let value_cols = table_info + .table_info + .meta + .schema + .column_schemas + .iter() + .filter(|col| col.data_type == ConcreteDataType::float64_datatype()) + .collect::>(); + ensure!( + value_cols.len() == 1, + InvalidQuerySnafu { + reason: format!( + "TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}", + table_ref, + table_id, + value_cols.len(), + value_cols + ), + } + ); + // TODO(discord9): do need to check rest columns is string and is tag column? + let pk_idxs = table_info + .table_info + .meta + .primary_key_indices + .iter() + .collect::>(); + + for (idx, col) in table_info + .table_info + .meta + .schema + .column_schemas + .iter() + .enumerate() + { + // three cases: + // 1. val column + // 2. timestamp column + // 3. tag column (string) + + let is_pk: bool = pk_idxs.contains(&&idx); + + ensure!( + col.data_type == ConcreteDataType::float64_datatype() + || col.data_type.is_timestamp() + || (col.data_type == ConcreteDataType::string_datatype() && is_pk), + InvalidQuerySnafu { + reason: format!( + "TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported", + table_ref, + table_id, + col.name, + col.data_type + ), + } + ); + } + } + Ok(()) + } + pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { if self.tasks.write().await.remove(&flow_id).is_none() { warn!("Flow {flow_id} not found in tasks"); diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index cdd2f635e5..576c75d3cf 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -732,7 +732,25 @@ fn create_table_with_expr( sink_table_name: &[String; 3], query_type: &QueryType, ) -> Result { - let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?; + let table_def = match query_type { + &QueryType::Sql => { + if let Some(def) = build_pk_from_aggr(plan)? { + def + } else { + build_by_sql_schema(plan)? + } + } + QueryType::Tql => { + // first try build from aggr, then from tql schema because tql query might not have aggr node + if let Some(table_def) = build_pk_from_aggr(plan)? { + table_def + } else { + build_by_tql_schema(plan)? + } + } + }; + let first_time_stamp = table_def.ts_col; + let primary_keys = table_def.pks; let mut column_schemas = Vec::new(); for field in plan.schema().fields() { @@ -755,7 +773,7 @@ fn create_table_with_expr( let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name); if is_val_column { let col_schema = - ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true); + ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true); column_schemas.push(col_schema); } else if is_tag_column { let col_schema = @@ -809,15 +827,63 @@ fn create_table_with_expr( }) } +/// simply build by schema, return first timestamp column and no primary key +fn build_by_sql_schema(plan: &LogicalPlan) -> Result { + let first_time_stamp = plan.schema().fields().iter().find_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { + Some(f.name().clone()) + } else { + None + } + }); + Ok(TableDef { + ts_col: first_time_stamp, + pks: vec![], + }) +} + +/// Return first timestamp column found in output schema and all string columns +fn build_by_tql_schema(plan: &LogicalPlan) -> Result { + let first_time_stamp = plan.schema().fields().iter().find_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() { + Some(f.name().clone()) + } else { + None + } + }); + let string_columns = plan + .schema() + .fields() + .iter() + .filter_map(|f| { + if ConcreteDataType::from_arrow_type(f.data_type()).is_string() { + Some(f.name().clone()) + } else { + None + } + }) + .collect::>(); + + Ok(TableDef { + ts_col: first_time_stamp, + pks: string_columns, + }) +} + +struct TableDef { + ts_col: Option, + pks: Vec, +} + /// Return first timestamp column which is in group by clause and other columns which are also in group by clause /// /// # Returns /// /// * `Option` - first timestamp column which is in group by clause /// * `Vec` - other columns which are also in group by clause -fn build_primary_key_constraint( - plan: &LogicalPlan, -) -> Result<(Option, Vec), Error> { +/// +/// if no aggregation found, return None +fn build_pk_from_aggr(plan: &LogicalPlan) -> Result, Error> { let fields = plan.schema().fields(); let mut pk_names = FindGroupByFinalName::default(); @@ -827,13 +893,18 @@ fn build_primary_key_constraint( })?; // if no group by clause, return empty with first timestamp column found in output schema - let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default(); + let Some(pk_final_names) = pk_names.get_group_expr_names() else { + return Ok(None); + }; if pk_final_names.is_empty() { let first_ts_col = fields .iter() .find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp()) .map(|f| f.name().clone()); - return Ok((first_ts_col, Vec::new())); + return Ok(Some(TableDef { + ts_col: first_ts_col, + pks: vec![], + })); } let all_pk_cols: Vec<_> = fields @@ -855,7 +926,10 @@ fn build_primary_key_constraint( .filter(|col| first_time_stamp != Some(col.to_string())) .collect(); - Ok((first_time_stamp, all_pk_cols)) + Ok(Some(TableDef { + ts_col: first_time_stamp, + pks: all_pk_cols, + })) } #[cfg(test)] diff --git a/tests/cases/distributed/flow-tql/flow_tql.result b/tests/cases/distributed/flow-tql/flow_tql.result index 1958a7effd..6afffc6edb 100644 --- a/tests/cases/distributed/flow-tql/flow_tql.result +++ b/tests/cases/distributed/flow-tql/flow_tql.result @@ -15,20 +15,26 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-----------------------------------------+ -| Table | Create Table | -+----------+-----------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "val" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-----------------------------------------+ ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (now() - '17s'::interval, 'host1', 'idc1', 200), @@ -80,6 +86,43 @@ DROP TABLE cnt_reqs; Affected Rows: 0 +CREATE TABLE http_requests_two_vals ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val DOUBLE, + valb DOUBLE, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +-- should failed with two value columns error +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals); + +Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input + +-- should failed with two value columns error +-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED] +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]); + +Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null] + +SHOW TABLES; + ++------------------------+ +| Tables | ++------------------------+ +| http_requests_two_vals | +| numbers | ++------------------------+ + +DROP TABLE http_requests_two_vals; + +Affected Rows: 0 + CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, @@ -114,20 +157,26 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-----------------------------------------+ -| Table | Create Table | -+----------+-----------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "val" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-----------------------------------------+ ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (0::Timestamp, 'host1', 'idc1', 200), @@ -158,20 +207,20 @@ ADMIN FLUSH_FLOW('calc_reqs'); | FLOW_FLUSHED | +-------------------------------+ -SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT * FROM cnt_reqs ORDER BY ts, status_code; -+-----+---------------------+-------------+ -| val | ts | status_code | -+-----+---------------------+-------------+ -| 3.0 | 1970-01-01T00:00:00 | 200.0 | -| 1.0 | 1970-01-01T00:00:00 | 401.0 | -| 1.0 | 1970-01-01T00:00:05 | 401.0 | -| 2.0 | 1970-01-01T00:00:05 | 404.0 | -| 1.0 | 1970-01-01T00:00:05 | 500.0 | -| 2.0 | 1970-01-01T00:00:10 | 200.0 | -| 2.0 | 1970-01-01T00:00:10 | 201.0 | -| 4.0 | 1970-01-01T00:00:15 | 500.0 | -+-----+---------------------+-------------+ ++--------------------------+---------------------+-------------+ +| count(http_requests.val) | ts | status_code | ++--------------------------+---------------------+-------------+ +| 3.0 | 1970-01-01T00:00:00 | 200.0 | +| 1.0 | 1970-01-01T00:00:00 | 401.0 | +| 1.0 | 1970-01-01T00:00:05 | 401.0 | +| 2.0 | 1970-01-01T00:00:05 | 404.0 | +| 1.0 | 1970-01-01T00:00:05 | 500.0 | +| 2.0 | 1970-01-01T00:00:10 | 200.0 | +| 2.0 | 1970-01-01T00:00:10 | 201.0 | +| 4.0 | 1970-01-01T00:00:15 | 500.0 | ++--------------------------+---------------------+-------------+ DROP FLOW calc_reqs; @@ -199,18 +248,24 @@ Affected Rows: 0 SHOW CREATE TABLE rate_reqs; -+-----------+------------------------------------------+ -| Table | Create Table | -+-----------+------------------------------------------+ -| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "val" DOUBLE NULL, | -| | TIME INDEX ("ts") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-----------+------------------------------------------+ ++-----------+-----------------------------------------------------------+ +| Table | Create Table | ++-----------+-----------------------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+-----------------------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (now() - '1m'::interval, 0), @@ -248,3 +303,84 @@ DROP TABLE rate_reqs; Affected Rows: 0 +CREATE TABLE http_requests_total ( + host STRING, + job STRING, + instance STRING, + byte DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (host, job, instance) +); + +Affected Rows: 0 + +CREATE FLOW calc_rate +SINK TO rate_reqs +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]); + +Affected Rows: 0 + +SHOW CREATE TABLE rate_reqs; + ++-----------+-----------------------------------------------------------+ +| Table | Create Table | ++-----------+-----------------------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "job" STRING NULL, | +| | "instance" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host", "job", "instance") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+-----------------------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + +++ +++ + +INSERT INTO TABLE http_requests_total VALUES + ('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval), + ('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval), + ('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval), + ('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval), + ('localhost', 'my_service', 'instance1', 400, now()); + +Affected Rows: 5 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_rate') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT count(*)>0 FROM rate_reqs; + ++---------------------+ +| count(*) > Int64(0) | ++---------------------+ +| true | ++---------------------+ + +DROP FLOW calc_rate; + +Affected Rows: 0 + +DROP TABLE http_requests_total; + +Affected Rows: 0 + +DROP TABLE rate_reqs; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql.result b/tests/cases/standalone/flow-tql/flow_tql.result index fcbfed8651..6fb9386e83 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.result +++ b/tests/cases/standalone/flow-tql/flow_tql.result @@ -15,20 +15,26 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-----------------------------------------+ -| Table | Create Table | -+----------+-----------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "val" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-----------------------------------------+ ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (now() - '17s'::interval, 'host1', 'idc1', 200), @@ -80,6 +86,43 @@ DROP TABLE cnt_reqs; Affected Rows: 0 +CREATE TABLE http_requests_two_vals ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val DOUBLE, + valb DOUBLE, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +-- should failed with two value columns error +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals); + +Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input + +-- should failed with two value columns error +-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED] +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]); + +Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null] + +SHOW TABLES; + ++------------------------+ +| Tables | ++------------------------+ +| http_requests_two_vals | +| numbers | ++------------------------+ + +DROP TABLE http_requests_two_vals; + +Affected Rows: 0 + CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, @@ -114,20 +157,26 @@ Affected Rows: 0 SHOW CREATE TABLE cnt_reqs; -+----------+-----------------------------------------+ -| Table | Create Table | -+----------+-----------------------------------------+ -| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | -| | "val" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "status_code" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("status_code") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+----------+-----------------------------------------+ ++----------+-------------------------------------------+ +| Table | Create Table | ++----------+-------------------------------------------+ +| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( | +| | "count(http_requests.val)" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "status_code" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("status_code") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------+-------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (0::Timestamp, 'host1', 'idc1', 200), @@ -158,20 +207,20 @@ ADMIN FLUSH_FLOW('calc_reqs'); | FLOW_FLUSHED | +-------------------------------+ -SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT * FROM cnt_reqs ORDER BY ts, status_code; -+-----+---------------------+-------------+ -| val | ts | status_code | -+-----+---------------------+-------------+ -| 3.0 | 1970-01-01T00:00:00 | 200.0 | -| 1.0 | 1970-01-01T00:00:00 | 401.0 | -| 1.0 | 1970-01-01T00:00:05 | 401.0 | -| 2.0 | 1970-01-01T00:00:05 | 404.0 | -| 1.0 | 1970-01-01T00:00:05 | 500.0 | -| 2.0 | 1970-01-01T00:00:10 | 200.0 | -| 2.0 | 1970-01-01T00:00:10 | 201.0 | -| 4.0 | 1970-01-01T00:00:15 | 500.0 | -+-----+---------------------+-------------+ ++--------------------------+---------------------+-------------+ +| count(http_requests.val) | ts | status_code | ++--------------------------+---------------------+-------------+ +| 3.0 | 1970-01-01T00:00:00 | 200.0 | +| 1.0 | 1970-01-01T00:00:00 | 401.0 | +| 1.0 | 1970-01-01T00:00:05 | 401.0 | +| 2.0 | 1970-01-01T00:00:05 | 404.0 | +| 1.0 | 1970-01-01T00:00:05 | 500.0 | +| 2.0 | 1970-01-01T00:00:10 | 200.0 | +| 2.0 | 1970-01-01T00:00:10 | 201.0 | +| 4.0 | 1970-01-01T00:00:15 | 500.0 | ++--------------------------+---------------------+-------------+ DROP FLOW calc_reqs; @@ -199,18 +248,24 @@ Affected Rows: 0 SHOW CREATE TABLE rate_reqs; -+-----------+------------------------------------------+ -| Table | Create Table | -+-----------+------------------------------------------+ -| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "val" DOUBLE NULL, | -| | TIME INDEX ("ts") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-----------+------------------------------------------+ ++-----------+-----------------------------------------------------------+ +| Table | Create Table | ++-----------+-----------------------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+-----------------------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + +++ +++ INSERT INTO TABLE http_requests VALUES (now() - '1m'::interval, 0), @@ -248,3 +303,84 @@ DROP TABLE rate_reqs; Affected Rows: 0 +CREATE TABLE http_requests_total ( + host STRING, + job STRING, + instance STRING, + byte DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (host, job, instance) +); + +Affected Rows: 0 + +CREATE FLOW calc_rate +SINK TO rate_reqs +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]); + +Affected Rows: 0 + +SHOW CREATE TABLE rate_reqs; + ++-----------+-----------------------------------------------------------+ +| Table | Create Table | ++-----------+-----------------------------------------------------------+ +| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "job" STRING NULL, | +| | "instance" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host", "job", "instance") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+-----------------------------------------------------------+ + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + +++ +++ + +INSERT INTO TABLE http_requests_total VALUES + ('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval), + ('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval), + ('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval), + ('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval), + ('localhost', 'my_service', 'instance1', 400, now()); + +Affected Rows: 5 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + ++-------------------------------+ +| ADMIN FLUSH_FLOW('calc_rate') | ++-------------------------------+ +| FLOW_FLUSHED | ++-------------------------------+ + +SELECT count(*)>0 FROM rate_reqs; + ++---------------------+ +| count(*) > Int64(0) | ++---------------------+ +| true | ++---------------------+ + +DROP FLOW calc_rate; + +Affected Rows: 0 + +DROP TABLE http_requests_total; + +Affected Rows: 0 + +DROP TABLE rate_reqs; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow-tql/flow_tql.sql b/tests/cases/standalone/flow-tql/flow_tql.sql index c176cbe24b..c32c10ff3a 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.sql +++ b/tests/cases/standalone/flow-tql/flow_tql.sql @@ -11,6 +11,9 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_ SHOW CREATE TABLE cnt_reqs; +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + INSERT INTO TABLE http_requests VALUES (now() - '17s'::interval, 'host1', 'idc1', 200), (now() - '17s'::interval, 'host2', 'idc1', 200), @@ -39,6 +42,28 @@ DROP FLOW calc_reqs; DROP TABLE http_requests; DROP TABLE cnt_reqs; +CREATE TABLE http_requests_two_vals ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val DOUBLE, + valb DOUBLE, + PRIMARY KEY(host, idc), +); + +-- should failed with two value columns error +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals); + +-- should failed with two value columns error +-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED] +CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]); + +SHOW TABLES; + +DROP TABLE http_requests_two_vals; + CREATE TABLE http_requests ( ts timestamp(3) time index, host STRING, @@ -64,6 +89,9 @@ TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("stat SHOW CREATE TABLE cnt_reqs; +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs); + INSERT INTO TABLE http_requests VALUES (0::Timestamp, 'host1', 'idc1', 200), (0::Timestamp, 'host2', 'idc1', 200), @@ -85,7 +113,7 @@ INSERT INTO TABLE http_requests VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('calc_reqs'); -SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code; +SELECT * FROM cnt_reqs ORDER BY ts, status_code; DROP FLOW calc_reqs; DROP TABLE http_requests; @@ -101,6 +129,9 @@ TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]); SHOW CREATE TABLE rate_reqs; +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + INSERT INTO TABLE http_requests VALUES (now() - '1m'::interval, 0), (now() - '30s'::interval, 1), @@ -114,3 +145,38 @@ SELECT count(*) > 0 FROM rate_reqs; DROP FLOW calc_rate; DROP TABLE http_requests; DROP TABLE rate_reqs; + +CREATE TABLE http_requests_total ( + host STRING, + job STRING, + instance STRING, + byte DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (host, job, instance) +); + +CREATE FLOW calc_rate +SINK TO rate_reqs +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]); + +SHOW CREATE TABLE rate_reqs; + +-- test if sink table is tql queryable +TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs); + +INSERT INTO TABLE http_requests_total VALUES + ('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval), + ('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval), + ('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval), + ('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval), + ('localhost', 'my_service', 'instance1', 400, now()); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('calc_rate'); + +SELECT count(*)>0 FROM rate_reqs; + +DROP FLOW calc_rate; +DROP TABLE http_requests_total; +DROP TABLE rate_reqs;