feat: flow prom ql auto sink table is also promql-able (#6852)

* feat: flow prom ql auto sink table is also promql-able

Signed-off-by: discord9 <discord9@163.com>

* fix: gen create table expr without aggr/projection outermost

Signed-off-by: discord9 <discord9@163.com>

* test: update non-aggr testcase

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-29 20:16:03 +08:00
committed by GitHub
parent d585c23ba5
commit 367a25af06
8 changed files with 385 additions and 268 deletions

View File

@@ -41,8 +41,8 @@ use snafu::{OptionExt, ResultExt};
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu,
UnexpectedSnafu,
CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
NoAvailableFrontendSnafu, UnexpectedSnafu,
};
use crate::{Error, FlowAuthHeader};
@@ -290,13 +290,17 @@ impl FrontendClient {
) -> Result<u32, Error> {
self.handle(
Request::Ddl(api::v1::DdlRequest {
expr: Some(api::v1::ddl_request::Expr::CreateTable(create)),
expr: Some(api::v1::ddl_request::Expr::CreateTable(create.clone())),
}),
catalog,
schema,
&mut None,
)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateSinkTableSnafu {
create: create.clone(),
})
}
/// Execute a SQL statement on the frontend.

View File

@@ -17,7 +17,6 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use api::v1::CreateTableExpr;
use arrow_schema::Fields;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_query::logical_plan::breakup_insert_plan;
@@ -102,7 +101,7 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<Quer
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueryType {
/// query is a tql query
Tql,
@@ -589,7 +588,7 @@ impl BatchingTask {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, true).await?;
create_table_with_expr(&plan, &self.config.sink_table_name)
create_table_with_expr(&plan, &self.config.sink_table_name, &self.config.query_type)
}
/// will merge and use the first ten time window in query
@@ -731,12 +730,12 @@ impl BatchingTask {
fn create_table_with_expr(
plan: &LogicalPlan,
sink_table_name: &[String; 3],
query_type: &QueryType,
) -> Result<CreateTableExpr, Error> {
let fields = plan.schema().fields();
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan, fields)?;
let (first_time_stamp, primary_keys) = build_primary_key_constraint(plan)?;
let mut column_schemas = Vec::new();
for field in fields {
for field in plan.schema().fields() {
let name = field.name();
let ty = ConcreteDataType::from_arrow_type(field.data_type());
let col_schema = if first_time_stamp == Some(name.clone()) {
@@ -744,15 +743,40 @@ fn create_table_with_expr(
} else {
ColumnSchema::new(name, ty, true)
};
column_schemas.push(col_schema);
match query_type {
QueryType::Sql => {
column_schemas.push(col_schema);
}
QueryType::Tql => {
// if is val column, need to rename as val DOUBLE NULL
// if is tag column, need to cast type as STRING NULL
let is_tag_column = primary_keys.contains(name);
let is_val_column = !is_tag_column && first_time_stamp.as_ref() != Some(name);
if is_val_column {
let col_schema =
ColumnSchema::new("val", ConcreteDataType::float64_datatype(), true);
column_schemas.push(col_schema);
} else if is_tag_column {
let col_schema =
ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
column_schemas.push(col_schema);
} else {
// time index column
column_schemas.push(col_schema);
}
}
}
}
let update_at_schema = ColumnSchema::new(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
column_schemas.push(update_at_schema);
if query_type == &QueryType::Sql {
let update_at_schema = ColumnSchema::new(
AUTO_CREATED_UPDATE_AT_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
column_schemas.push(update_at_schema);
}
let time_index = if let Some(time_index) = first_time_stamp {
time_index
@@ -793,8 +817,8 @@ fn create_table_with_expr(
/// * `Vec<String>` - other columns which are also in group by clause
fn build_primary_key_constraint(
plan: &LogicalPlan,
schema: &Fields,
) -> Result<(Option<String>, Vec<String>), Error> {
let fields = plan.schema().fields();
let mut pk_names = FindGroupByFinalName::default();
plan.visit(&mut pk_names)
@@ -802,19 +826,23 @@ fn build_primary_key_constraint(
context: format!("Can't find aggr expr in plan {plan:?}"),
})?;
// if no group by clause, return empty
// if no group by clause, return empty with first timestamp column found in output schema
let pk_final_names = pk_names.get_group_expr_names().unwrap_or_default();
if pk_final_names.is_empty() {
return Ok((None, Vec::new()));
let first_ts_col = fields
.iter()
.find(|f| ConcreteDataType::from_arrow_type(f.data_type()).is_timestamp())
.map(|f| f.name().clone());
return Ok((first_ts_col, Vec::new()));
}
let all_pk_cols: Vec<_> = schema
let all_pk_cols: Vec<_> = fields
.iter()
.filter(|f| pk_final_names.contains(f.name()))
.map(|f| f.name().clone())
.collect();
// auto create table use first timestamp column in group by clause as time index
let first_time_stamp = schema
let first_time_stamp = fields
.iter()
.find(|f| {
all_pk_cols.contains(&f.name().clone())
@@ -873,13 +901,13 @@ mod test {
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
true,
),
false,
)
.with_time_index(true),
update_at_schema.clone(),
ts_placeholder_schema.clone(),
],
primary_keys: vec![],
time_index: AUTO_CREATED_PLACEHOLDER_TS_COL.to_string(),
time_index: "ts".to_string(),
},
TestCase {
sql: "SELECT number, max(ts) FROM numbers_with_ts GROUP BY number".to_string(),
@@ -946,6 +974,7 @@ mod test {
"public".to_string(),
tc.sink_table_name.clone(),
],
&QueryType::Sql,
)
.unwrap();
// TODO(discord9): assert expr
@@ -954,9 +983,9 @@ mod test {
.iter()
.map(|c| try_as_column_schema(c).unwrap())
.collect::<Vec<_>>();
assert_eq!(tc.column_schemas, column_schemas);
assert_eq!(tc.primary_keys, expr.primary_keys);
assert_eq!(tc.time_index, expr.time_index);
assert_eq!(tc.column_schemas, column_schemas, "{:?}", tc.sql);
assert_eq!(tc.primary_keys, expr.primary_keys, "{:?}", tc.sql);
assert_eq!(tc.time_index, expr.time_index, "{:?}", tc.sql);
}
}
}

View File

@@ -258,56 +258,9 @@ impl AddAutoColumnRewriter {
is_rewritten: false,
}
}
}
impl TreeNodeRewriter for AddAutoColumnRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
// if is distinct all, wrap it in a projection
if let LogicalPlan::Distinct(Distinct::All(_)) = &node {
let mut exprs = vec![];
for field in node.schema().fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
field.name(),
)));
}
let projection =
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
node = projection;
}
// handle table_scan by wrap it in a projection
else if let LogicalPlan::TableScan(table_scan) = node {
let mut exprs = vec![];
for field in table_scan.projected_schema.fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new(
Some(table_scan.table_name.clone()),
field.name(),
)));
}
let projection = LogicalPlan::Projection(Projection::try_new(
exprs,
Arc::new(LogicalPlan::TableScan(table_scan)),
)?);
node = projection;
}
// only do rewrite if found the outermost projection
let mut exprs = if let LogicalPlan::Projection(project) = &node {
project.expr.clone()
} else {
return Ok(Transformed::no(node));
};
/// modify the exprs in place so that it matches the schema and some auto columns are added
fn modify_project_exprs(&mut self, mut exprs: Vec<Expr>) -> DfResult<Vec<Expr>> {
let all_names = self
.schema
.column_schemas()
@@ -391,10 +344,76 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
query_col_cnt, exprs, table_col_cnt, self.schema.column_schemas()
)));
}
Ok(exprs)
}
}
self.is_rewritten = true;
let new_plan = node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
Ok(Transformed::yes(new_plan))
impl TreeNodeRewriter for AddAutoColumnRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
return Ok(Transformed::no(node));
}
// if is distinct all, wrap it in a projection
if let LogicalPlan::Distinct(Distinct::All(_)) = &node {
let mut exprs = vec![];
for field in node.schema().fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
field.name(),
)));
}
let projection =
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
node = projection;
}
// handle table_scan by wrap it in a projection
else if let LogicalPlan::TableScan(table_scan) = node {
let mut exprs = vec![];
for field in table_scan.projected_schema.fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new(
Some(table_scan.table_name.clone()),
field.name(),
)));
}
let projection = LogicalPlan::Projection(Projection::try_new(
exprs,
Arc::new(LogicalPlan::TableScan(table_scan)),
)?);
node = projection;
}
// only do rewrite if found the outermost projection
// if the outermost node is projection, can rewrite the exprs
// if not, wrap it in a projection
if let LogicalPlan::Projection(project) = &node {
let exprs = project.expr.clone();
let exprs = self.modify_project_exprs(exprs)?;
self.is_rewritten = true;
let new_plan =
node.with_new_exprs(exprs, node.inputs().into_iter().cloned().collect())?;
Ok(Transformed::yes(new_plan))
} else {
// wrap the logical plan in a projection
let mut exprs = vec![];
for field in node.schema().fields().iter() {
exprs.push(Expr::Column(datafusion::common::Column::new_unqualified(
field.name(),
)));
}
let exprs = self.modify_project_exprs(exprs)?;
self.is_rewritten = true;
let new_plan =
LogicalPlan::Projection(Projection::try_new(exprs, Arc::new(node.clone()))?);
Ok(Transformed::yes(new_plan))
}
}
/// We might add new columns, so we need to recompute the schema

View File

@@ -16,6 +16,7 @@
use std::any::Any;
use api::v1::CreateTableExpr;
use arrow_schema::ArrowError;
use common_error::ext::BoxedError;
use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
@@ -60,6 +61,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Error encountered while creating sink table for flow: {create:?}"))]
CreateSinkTable {
create: CreateTableExpr,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Time error"))]
Time {
source: common_time::error::Error,
@@ -331,9 +340,10 @@ impl ErrorExt for Error {
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::FlowNotFound { .. } => StatusCode::FlowNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::CreateFlow { .. } | Self::Arrow { .. } | Self::Time { .. } => {
StatusCode::EngineExecuteQuery
}
Self::CreateFlow { .. }
| Self::CreateSinkTable { .. }
| Self::Arrow { .. }
| Self::Time { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. }
| Self::SyncCheckTask { .. }
| Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected,

View File

@@ -2,7 +2,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -15,21 +15,20 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -85,7 +84,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -115,21 +114,20 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
@@ -160,20 +158,20 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED |
+-------------------------------+
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3 | 1970-01-01T00:00:00 | 200 |
| 1 | 1970-01-01T00:00:00 | 401 |
| 1 | 1970-01-01T00:00:05 | 401 |
| 2 | 1970-01-01T00:00:05 | 404 |
| 1 | 1970-01-01T00:00:05 | 500 |
| 2 | 1970-01-01T00:00:10 | 200 |
| 2 | 1970-01-01T00:00:10 | 201 |
| 4 | 1970-01-01T00:00:15 | 500 |
+--------------------------+---------------------+-------------+
+-----+---------------------+-------------+
| val | ts | status_code |
+-----+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+
DROP FLOW calc_reqs;
@@ -187,3 +185,66 @@ DROP TABLE cnt_reqs;
Affected Rows: 0
CREATE TABLE http_requests (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+
| Table | Create Table |
+-----------+------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+------------------------------------------+
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
(now() - '30s'::interval, 1),
(now(), 2);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*) > 0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -1,92 +0,0 @@
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
(now() - '17s'::interval, 'host2', 'idc1', 200),
(now() - '17s'::interval, 'host3', 'idc2', 200),
(now() - '17s'::interval, 'host4', 'idc2', 401),
(now() - '13s'::interval, 'host1', 'idc1', 404),
(now() - '13s'::interval, 'host2', 'idc1', 401),
(now() - '13s'::interval, 'host3', 'idc2', 404),
(now() - '13s'::interval, 'host4', 'idc2', 500),
(now() - '7s'::interval, 'host1', 'idc1', 200),
(now() - '7s'::interval, 'host2', 'idc1', 200),
(now() - '7s'::interval, 'host3', 'idc2', 201),
(now() - '7s'::interval, 'host4', 'idc2', 201),
(now() - '3s'::interval, 'host1', 'idc1', 500),
(now() - '3s'::interval, 'host2', 'idc1', 500),
(now() - '3s'::interval, 'host3', 'idc2', 500),
(now() - '3s'::interval, 'host4', 'idc2', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
-- too much indeterminsticity in the test, so just check that the flow is running
SELECT count(*) > 0 FROM cnt_reqs;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
-- standalone&distributed have slightly different error message(distributed will print source error as well ("cannot convert float seconds to Duration: value is negative"))
-- so duplicate test into two
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()+'15s'::interval), '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-now()+'15s'::interval, '5s') count_values("status_code", http_requests);
CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - now(), now()-(now()-'15s'::interval), '5s') count_values("status_code", http_requests);
SHOW CREATE TABLE cnt_reqs;
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
(0::Timestamp, 'host2', 'idc1', 200),
(0::Timestamp, 'host3', 'idc2', 200),
(0::Timestamp, 'host4', 'idc2', 401),
(5000::Timestamp, 'host1', 'idc1', 404),
(5000::Timestamp, 'host2', 'idc1', 401),
(5000::Timestamp, 'host3', 'idc2', 404),
(5000::Timestamp, 'host4', 'idc2', 500),
(10000::Timestamp, 'host1', 'idc1', 200),
(10000::Timestamp, 'host2', 'idc1', 200),
(10000::Timestamp, 'host3', 'idc2', 201),
(10000::Timestamp, 'host4', 'idc2', 201),
(15000::Timestamp, 'host1', 'idc1', 500),
(15000::Timestamp, 'host2', 'idc1', 500),
(15000::Timestamp, 'host3', 'idc2', 500),
(15000::Timestamp, 'host4', 'idc2', 500);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;

View File

@@ -0,0 +1 @@
../../standalone/flow-tql/flow_tql.sql

View File

@@ -2,7 +2,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -15,21 +15,20 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
INSERT INTO TABLE http_requests VALUES
(now() - '17s'::interval, 'host1', 'idc1', 200),
@@ -85,7 +84,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -115,21 +114,20 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" BIGINT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
+----------+-----------------------------------------+
| Table | Create Table |
+----------+-----------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "status_code" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("status_code") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-----------------------------------------+
INSERT INTO TABLE http_requests VALUES
(0::Timestamp, 'host1', 'idc1', 200),
@@ -160,20 +158,20 @@ ADMIN FLUSH_FLOW('calc_reqs');
| FLOW_FLUSHED |
+-------------------------------+
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3 | 1970-01-01T00:00:00 | 200 |
| 1 | 1970-01-01T00:00:00 | 401 |
| 1 | 1970-01-01T00:00:05 | 401 |
| 2 | 1970-01-01T00:00:05 | 404 |
| 1 | 1970-01-01T00:00:05 | 500 |
| 2 | 1970-01-01T00:00:10 | 200 |
| 2 | 1970-01-01T00:00:10 | 201 |
| 4 | 1970-01-01T00:00:15 | 500 |
+--------------------------+---------------------+-------------+
+-----+---------------------+-------------+
| val | ts | status_code |
+-----+---------------------+-------------+
| 3.0 | 1970-01-01T00:00:00 | 200.0 |
| 1.0 | 1970-01-01T00:00:00 | 401.0 |
| 1.0 | 1970-01-01T00:00:05 | 401.0 |
| 2.0 | 1970-01-01T00:00:05 | 404.0 |
| 1.0 | 1970-01-01T00:00:05 | 500.0 |
| 2.0 | 1970-01-01T00:00:10 | 200.0 |
| 2.0 | 1970-01-01T00:00:10 | 201.0 |
| 4.0 | 1970-01-01T00:00:15 | 500.0 |
+-----+---------------------+-------------+
DROP FLOW calc_reqs;
@@ -187,3 +185,66 @@ DROP TABLE cnt_reqs;
Affected Rows: 0
CREATE TABLE http_requests (
ts timestamp(3) time index,
val DOUBLE,
);
Affected Rows: 0
CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
Affected Rows: 0
SHOW CREATE TABLE rate_reqs;
+-----------+------------------------------------------+
| Table | Create Table |
+-----------+------------------------------------------+
| rate_reqs | CREATE TABLE IF NOT EXISTS "rate_reqs" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+------------------------------------------+
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
(now() - '30s'::interval, 1),
(now(), 2);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
+-------------------------------+
| ADMIN FLUSH_FLOW('calc_rate') |
+-------------------------------+
| FLOW_FLUSHED |
+-------------------------------+
SELECT count(*) > 0 FROM rate_reqs;
+---------------------+
| count(*) > Int64(0) |
+---------------------+
| true |
+---------------------+
DROP FLOW calc_rate;
Affected Rows: 0
DROP TABLE http_requests;
Affected Rows: 0
DROP TABLE rate_reqs;
Affected Rows: 0

View File

@@ -2,7 +2,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -43,7 +43,7 @@ CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
val DOUBLE,
PRIMARY KEY(host, idc),
);
@@ -85,8 +85,32 @@ INSERT INTO TABLE http_requests VALUES
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_reqs');
SELECT "count(http_requests.val)", ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
SELECT val, ts, status_code FROM cnt_reqs ORDER BY ts, status_code;
DROP FLOW calc_reqs;
DROP TABLE http_requests;
DROP TABLE cnt_reqs;
CREATE TABLE http_requests (
ts timestamp(3) time index,
val DOUBLE,
);
CREATE FLOW calc_rate SINK TO rate_reqs EVAL INTERVAL '1m' AS
TQL EVAL (now() - '1m'::interval, now(), '30s') rate(http_requests[5m]);
SHOW CREATE TABLE rate_reqs;
INSERT INTO TABLE http_requests VALUES
(now() - '1m'::interval, 0),
(now() - '30s'::interval, 1),
(now(), 2);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('calc_rate');
SELECT count(*) > 0 FROM rate_reqs;
DROP FLOW calc_rate;
DROP TABLE http_requests;
DROP TABLE rate_reqs;