fix(flow): promql auto create table (#6867)

* fix: non aggr prom ql auto create table

Signed-off-by: discord9 <discord9@163.com>

* feat: val column use any name

Signed-off-by: discord9 <discord9@163.com>

* feat: check if it's tql src table

Signed-off-by: discord9 <discord9@163.com>

* test: check sink table is tql-able

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness redacted

Signed-off-by: discord9 <discord9@163.com>

* fix: sql also handle no aggr case

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
discord9
2025-09-02 16:54:16 +08:00
committed by Weny Xu
parent 9175fa643d
commit 6ee91f6af4
5 changed files with 668 additions and 122 deletions

View File

@@ -29,10 +29,15 @@ use common_runtime::JoinHandle;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::TimeToLive;
use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
use datafusion_expr::LogicalPlan;
use datatypes::prelude::ConcreteDataType;
use query::QueryEngineRef;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use sql::parsers::utils::is_tql;
use store_api::storage::{RegionId, TableId};
use table::table_reference::TableReference;
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,8 +47,8 @@ use crate::batching_mode::utils::sql_to_df_plan;
use crate::batching_mode::BatchingModeOptions;
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu,
TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
@@ -370,13 +375,12 @@ impl BatchingEngine {
}
})?;
let query_ctx = Arc::new(query_ctx);
let is_tql = is_tql(query_ctx.sql_dialect(), &sql)
.map_err(BoxedError::new)
.context(CreateFlowSnafu { sql: &sql })?;
// optionally set a eval interval for the flow
if eval_interval.is_none()
&& is_tql(query_ctx.sql_dialect(), &sql)
.map_err(BoxedError::new)
.context(CreateFlowSnafu { sql: &sql })?
{
if eval_interval.is_none() && is_tql {
InvalidQuerySnafu {
reason: "TQL query requires EVAL INTERVAL to be set".to_string(),
}
@@ -418,6 +422,11 @@ impl BatchingEngine {
let (tx, rx) = oneshot::channel();
let plan = sql_to_df_plan(query_ctx.clone(), self.query_engine.clone(), &sql, true).await?;
if is_tql {
self.check_is_tql_table(&plan, &query_ctx).await?;
}
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
self.query_engine.engine_state().catalog_manager().clone(),
@@ -484,6 +493,131 @@ impl BatchingEngine {
Ok(Some(flow_id))
}
async fn check_is_tql_table(
&self,
query: &LogicalPlan,
query_ctx: &QueryContext,
) -> Result<(), Error> {
struct CollectTableRef {
table_refs: HashSet<datafusion_common::TableReference>,
}
impl TreeNodeVisitor<'_> for CollectTableRef {
type Node = LogicalPlan;
fn f_down(
&mut self,
node: &Self::Node,
) -> datafusion_common::Result<TreeNodeRecursion> {
if let LogicalPlan::TableScan(scan) = node {
self.table_refs.insert(scan.table_name.clone());
}
Ok(TreeNodeRecursion::Continue)
}
}
let mut table_refs = CollectTableRef {
table_refs: HashSet::new(),
};
query
.visit_with_subqueries(&mut table_refs)
.context(DatafusionSnafu {
context: "Checking if all source tables are TQL tables",
})?;
let default_catalog = query_ctx.current_catalog();
let default_schema = query_ctx.current_schema();
let default_schema = &default_schema;
for table_ref in table_refs.table_refs {
let table_ref = match &table_ref {
datafusion_common::TableReference::Bare { table } => {
TableReference::full(default_catalog, default_schema, table)
}
datafusion_common::TableReference::Partial { schema, table } => {
TableReference::full(default_catalog, schema, table)
}
datafusion_common::TableReference::Full {
catalog,
schema,
table,
} => TableReference::full(catalog, schema, table),
};
let table_id = self
.table_meta
.table_name_manager()
.get(table_ref.into())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to get table id for table: {}", table_ref),
})?
.table_id();
let table_info =
get_table_info(self.table_meta.table_info_manager(), &table_id).await?;
// first check if it's only one f64 value column
let value_cols = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.filter(|col| col.data_type == ConcreteDataType::float64_datatype())
.collect::<Vec<_>>();
ensure!(
value_cols.len() == 1,
InvalidQuerySnafu {
reason: format!(
"TQL query only supports one f64 value column, table `{}`(id={}) has {} f64 value columns, columns are: {:?}",
table_ref,
table_id,
value_cols.len(),
value_cols
),
}
);
// TODO(discord9): do need to check rest columns is string and is tag column?
let pk_idxs = table_info
.table_info
.meta
.primary_key_indices
.iter()
.collect::<HashSet<_>>();
for (idx, col) in table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.enumerate()
{
// three cases:
// 1. val column
// 2. timestamp column
// 3. tag column (string)
let is_pk: bool = pk_idxs.contains(&&idx);
ensure!(
col.data_type == ConcreteDataType::float64_datatype()
|| col.data_type.is_timestamp()
|| (col.data_type == ConcreteDataType::string_datatype() && is_pk),
InvalidQuerySnafu {
reason: format!(
"TQL query only supports f64 value column, timestamp column and string tag columns, table `{}`(id={}) has column `{}` with type {:?} which is not supported",
table_ref,
table_id,
col.name,
col.data_type
),
}
);
}
}
Ok(())
}
pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> {
if self.tasks.write().await.remove(&flow_id).is_none() {
warn!("Flow {flow_id} not found in tasks");

View File

@@ -732,7 +732,25 @@ fn create_table_with_expr(
sink_table_name: &[String; 3],
query_type: &QueryType,
) -> Result<CreateTableExpr, Error> {
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?;
let table_def = match query_type {
&QueryType::Sql => {
if let Some(def) = build_pk_from_aggr(plan)? {
def
} else {
build_by_sql_schema(plan)?
}
}
QueryType::Tql => {
// first try build from aggr, then from tql schema because tql query might not have aggr node
if let Some(table_def) = build_pk_from_aggr(plan)? {
table_def
} else {
build_by_tql_schema(plan)?
}
}
};
let first_time_stamp = table_def.ts_col;
let primary_keys = table_def.pks;
let mut column_schemas = Vec::new();
for field in plan.schema().fields() {
@@ -755,7 +773,7 @@ fn create_table_with_expr(
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
if is_val_column {
let col_schema =
ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true);
ColumnSchema::new(name, ConcreteDataType::float64_datatype(), true);
column_schemas.push(col_schema);
} else if is_tag_column {
let col_schema =
@@ -809,15 +827,63 @@ fn create_table_with_expr(
})
}
/// simply build by schema, return first timestamp column and no primary key
fn build_by_sql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
Some(f.name().clone())
} else {
None
}
});
Ok(TableDef {
ts_col: first_time_stamp,
pks: vec![],
})
}
/// Return first timestamp column found in output schema and all string columns
fn build_by_tql_schema(plan: &LogicalPlan) -> Result<TableDef, Error> {
let first_time_stamp = plan.schema().fields().iter().find_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp() {
Some(f.name().clone())
} else {
None
}
});
let string_columns = plan
.schema()
.fields()
.iter()
.filter_map(|f| {
if ConcreteDataType::from_arrow_type(f.data_type()).is_string() {
Some(f.name().clone())
} else {
None
}
})
.collect::<Vec<_>>();
Ok(TableDef {
ts_col: first_time_stamp,
pks: string_columns,
})
}
struct TableDef {
ts_col: Option<String>,
pks: Vec<String>,
}
/// Return first timestamp column which is in group by clause and other columns which are also in group by clause
///
/// # Returns
///
/// * `Option<String>` - first timestamp column which is in group by clause
/// * `Vec<String>` - other columns which are also in group by clause
fn build_primary_key_constraint(
plan: &LogicalPlan,
) -> Result<(Option<String>, Vec<String>), Error> {
///
/// if no aggregation found, return None
fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
let fields = plan.schema().fields();
let mut pk_names = FindGroupByFinalName::default();
@@ -827,13 +893,18 @@ fn build_primary_key_constraint(
})?;
// if no group by clause, return empty with first timestamp column found in output schema
let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
let Some(pk_final_names) = pk_names.get_group_expr_names() else {
return Ok(None);
};
if pk_final_names.is_empty() {
let first_ts_col = fields
.iter()
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
.map(|f| f.name().clone());
return Ok((first_ts_col, Vec::new()));
return Ok(Some(TableDef {
ts_col: first_ts_col,
pks: vec![],
}));
}
let all_pk_cols: Vec<_> = fields
@@ -855,7 +926,10 @@ fn build_primary_key_constraint(
.filter(|col| first_time_stamp != Some(col.to_string()))
.collect();
Ok((first_time_stamp, all_pk_cols))
Ok(Some(TableDef {
ts_col: first_time_stamp,
pks: all_pk_cols,
}))
}
#[cfg(test)]

View File

@@ -15,20 +15,26 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
Affected Rows: 0
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
SHOW TABLES;
+------------------------+
| Tables |
+------------------------+
| http_requests_two_vals |
| numbers |
+------------------------+
DROP TABLE http_requests_two_vals;
Affected Rows: 0
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
@@ -114,20 +157,26 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
@@ -158,20 +207,20 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED |
+-------------------------------+
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
+-----+---------------------+-------------+
| val | ts | status_code |
+-----+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+--------------------------+---------------------+-------------+
DROP FLOW calc_reqs;
@@ -199,18 +248,24 @@ Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+
| Table | Create Table |
+-----------+------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+------------------------------------------+
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
Affected Rows: 0
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
Affected Rows: 0
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "job" STRING NULL, |
| | "instance" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "job", "instance") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*)>0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests_total;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -15,20 +15,26 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -80,6 +86,43 @@ DROP TABLE cnt_reqs;
Affected Rows: 0
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
Error: 3001(EngineExecuteQuery), Unsupported expr type: count_values on multi-value input
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
Error: 3001(EngineExecuteQuery), Invalid query: TQL query only supports one f64 value column, table `greptime.public.http_requests_two_vals`(id=[REDACTED]) has 2 f64 value columns, columns are: [val Float64 null, valb Float64 null]
SHOW TABLES;
+------------------------+
| Tables |
+------------------------+
| http_requests_two_vals |
| numbers |
+------------------------+
DROP TABLE http_requests_two_vals;
Affected Rows: 0
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
@@ -114,20 +157,26 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
@@ -158,20 +207,20 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED |
+-------------------------------+
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
+-----+---------------------+-------------+
| val | ts | status_code |
+-----+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+--------------------------+---------------------+-------------+
DROP FLOW calc_reqs;
@@ -199,18 +248,24 @@ Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+
| Table | Create Table |
+-----------+------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+------------------------------------------+
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,val,ts,Int64(300000))" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
@@ -248,3 +303,84 @@ DROP TABLE rate_reqs;
Affected Rows: 0
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
Affected Rows: 0
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+-----------------------------------------------------------+
| Table | Create Table |
+-----------+-----------------------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "prom_rate(ts_range,byte,ts,Int64(60000))" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "job" STRING NULL, |
| | "instance" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "job", "instance") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
++
++
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
Affected Rows: 5
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*)>0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests_total;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -11,6 +11,9 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_
SHOW CREATE TABLE cnt_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '17s'::interval, 'host2', 'idc1', 200),
@@ -39,6 +42,28 @@ DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;
CREATE TABLE http_requests_two_vals (
ts timestamp(3) time index,
host STRING,
idc STRING,
val DOUBLE,
valb DOUBLE,
PRIMARY KEY(host, idc),
);
-- should failed with two value columns error
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests_two_vals);
-- should failed with two value columns error
-- SQLNESS REPLACE id=[0-9]+ id=[REDACTED]
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') rate(http_requests_two_vals[5m]);
SHOW TABLES;
DROP TABLE http_requests_two_vals;
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
@@ -64,6 +89,9 @@ TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("stat
SHOW CREATE TABLE cnt_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
(0::Timestamp, 'host2', 'idc1', 200),
@@ -85,7 +113,7 @@ INSERT INTO TABLE http_requests VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT * FROM cnt_reqs ORDER BY ts, status_code;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
@@ -101,6 +129,9 @@ TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
SHOW CREATE TABLE rate_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
(now() - '30s'::interval, 1),
@@ -114,3 +145,38 @@ SELECT count(*) > 0 FROM rate_reqs;
DROP FLOW calc_rate;
DROP TABLE http_requests;
DROP TABLE rate_reqs;
CREATE TABLE http_requests_total (
host STRING,
job STRING,
instance STRING,
byte DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (host, job, instance)
);
CREATE FLOW calc_rate
SINK TO rate_reqs
EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests_total{job="my_service"}[1m]);
SHOW CREATE TABLE rate_reqs;
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", rate_reqs);
INSERT INTO TABLE http_requests_total VALUES
('localhost', 'my_service', 'instance1', 100, now() - '1min'::interval),
('localhost', 'my_service', 'instance1', 200, now() - '45s'::interval),
('remotehost', 'my_service', 'instance1', 300, now() - '30s'::interval),
('remotehost', 'their_service', 'instance1', 300, now() - '15s'::interval),
('localhost', 'my_service', 'instance1', 400, now());
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
SELECT count(*)>0 FROM rate_reqs;
DROP FLOW calc_rate;
DROP TABLE http_requests_total;
DROP TABLE rate_reqs;