feat: flow last non null (#7646)

* feat: flow last non null support

Signed-off-by: discord9 <discord9@163.com>

* clippy

Signed-off-by: discord9 <discord9@163.com>

* test: report error when sink is not last non null

Signed-off-by: discord9 <discord9@163.com>

* fix: error if query column matches nothing if partial

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-30 20:34:33 +08:00
committed by GitHub
parent 81169e4a22
commit a6e69bc180
4 changed files with 456 additions and 8 deletions

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -37,6 +37,7 @@ use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use store_api::mito_engine_options::MERGE_MODE_KEY;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
use tokio::sync::oneshot;
@@ -101,6 +102,13 @@ fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<Quer
}
}
fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
options
.get(MERGE_MODE_KEY)
.map(|mode| mode.eq_ignore_ascii_case("last_non_null"))
.unwrap_or(false)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueryType {
/// query is a tql query
@@ -260,10 +268,17 @@ impl BatchingTask {
)
.await?;
let table_meta = &table.table_info().meta;
let merge_mode_last_non_null =
is_merge_mode_last_non_null(&table_meta.options.extra_options);
let primary_key_indices = table_meta.primary_key_indices.clone();
let new_query = self
.gen_query_with_time_window(
engine.clone(),
&table.table_info().meta.schema,
&primary_key_indices,
merge_mode_last_non_null,
max_window_cnt,
)
.await?;
@@ -596,6 +611,8 @@ impl BatchingTask {
&self,
engine: QueryEngineRef,
sink_table_schema: &Arc<Schema>,
primary_key_indices: &[usize],
allow_partial: bool,
max_window_cnt: Option<usize>,
) -> Result<Option<PlanInfo>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
@@ -643,6 +660,8 @@ impl BatchingTask {
query_ctx,
engine,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
)
.await?;
@@ -657,6 +676,8 @@ impl BatchingTask {
query_ctx,
engine,
sink_table_schema.clone(),
primary_key_indices,
allow_partial,
)
.await?;
@@ -725,7 +746,11 @@ impl BatchingTask {
};
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
let mut add_auto_column = ColumnMatcherRewriter::new(
sink_table_schema.clone(),
primary_key_indices.to_vec(),
allow_partial,
);
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;

View File

@@ -14,7 +14,7 @@
//! some utils for helping with batching mode
use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use catalog::CatalogManagerRef;
@@ -28,7 +28,7 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan, Projection};
use datatypes::schema::SchemaRef;
use datatypes::schema::{ColumnSchema, SchemaRef};
use query::QueryEngineRef;
use query::parser::{DEFAULT_LOOKBACK_STRING, PromQuery, QueryLanguageParser, QueryStatement};
use session::context::QueryContextRef;
@@ -38,7 +38,7 @@ use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
use table::TableRef;
use crate::adapter::AUTO_CREATED_PLACEHOLDER_TS_COL;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, TableNotFoundSnafu};
use crate::{Error, TableName};
@@ -143,10 +143,16 @@ pub(crate) async fn gen_plan_with_matching_schema(
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sink_table_schema: SchemaRef,
primary_key_indices: &[usize],
allow_partial: bool,
) -> Result<LogicalPlan, Error> {
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?;
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema);
let mut add_auto_column = ColumnMatcherRewriter::new(
sink_table_schema,
primary_key_indices.to_vec(),
allow_partial,
);
let plan = plan
.clone()
.rewrite(&mut add_auto_column)
@@ -271,18 +277,26 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
pub struct ColumnMatcherRewriter {
pub schema: SchemaRef,
pub is_rewritten: bool,
pub primary_key_indices: Vec<usize>,
pub allow_partial: bool,
}
impl ColumnMatcherRewriter {
pub fn new(schema: SchemaRef) -> Self {
pub fn new(schema: SchemaRef, primary_key_indices: Vec<usize>, allow_partial: bool) -> Self {
Self {
schema,
is_rewritten: false,
primary_key_indices,
allow_partial,
}
}
/// modify the exprs in place so that it matches the schema and some auto columns are added
fn modify_project_exprs(&mut self, mut exprs: Vec<Expr>) -> DfResult<Vec<Expr>> {
if self.allow_partial {
return self.modify_project_exprs_with_partial(exprs);
}
let all_names = self
.schema
.column_schemas()
@@ -368,6 +382,105 @@ impl ColumnMatcherRewriter {
}
Ok(exprs)
}
fn modify_project_exprs_with_partial(&mut self, exprs: Vec<Expr>) -> DfResult<Vec<Expr>> {
let table_col_cnt = self.schema.column_schemas().len();
let query_col_cnt = exprs.len();
if query_col_cnt > table_col_cnt {
return Err(DataFusionError::Plan(format!(
"Expect query column count <= table column count, found {} query columns {:?}, {} table columns {:?}",
query_col_cnt,
exprs,
table_col_cnt,
self.schema.column_schemas()
)));
}
let name_to_expr: HashMap<String, Expr> = exprs
.clone()
.into_iter()
.map(|e| (e.qualified_name().1, e))
.collect();
let required_columns = self.required_columns_for_partial();
let missing: Vec<_> = required_columns
.iter()
.filter(|name| !name_to_expr.contains_key(*name))
.cloned()
.collect();
if !missing.is_empty() {
return Err(DataFusionError::Plan(format!(
"Column(s) {:?} required by sink table are missing from flow output when merge_mode=last_non_null",
missing
)));
}
let placeholder_ts_expr =
datafusion::logical_expr::lit(ScalarValue::TimestampMillisecond(Some(0), None))
.alias(AUTO_CREATED_PLACEHOLDER_TS_COL);
let timestamp_index = self.schema.timestamp_index();
let mut remap = name_to_expr;
let mut new_exprs = Vec::with_capacity(table_col_cnt);
for (idx, col_schema) in self.schema.column_schemas().iter().enumerate() {
let col_name = col_schema.name.clone();
if let Some(expr) = remap.remove(&col_name) {
let expr = if expr.qualified_name().1 == col_name {
expr
} else {
expr.alias(col_name.clone())
};
new_exprs.push(expr);
continue;
}
if col_name == AUTO_CREATED_PLACEHOLDER_TS_COL && timestamp_index == Some(idx) {
new_exprs.push(placeholder_ts_expr.clone());
continue;
}
if col_name == AUTO_CREATED_UPDATE_AT_TS_COL && col_schema.data_type.is_timestamp() {
new_exprs.push(datafusion::prelude::now().alias(&col_name));
continue;
}
new_exprs.push(Self::null_expr(col_schema));
}
if !remap.is_empty() {
let extra: Vec<_> = remap.keys().cloned().collect();
return Err(DataFusionError::Plan(format!(
"Flow output has extra column(s) {:?} not found in sink schema when merge_mode=last_non_null",
extra
)));
}
Ok(new_exprs)
}
fn null_expr(col_schema: &ColumnSchema) -> Expr {
Expr::Literal(ScalarValue::Null, None).alias(col_schema.name.clone())
}
fn required_columns_for_partial(&self) -> HashSet<String> {
let mut required = HashSet::new();
for idx in &self.primary_key_indices {
if let Some(col) = self.schema.column_schemas().get(*idx) {
required.insert(col.name.clone());
}
}
if let Some(ts_idx) = self.schema.timestamp_index()
&& let Some(col) = self.schema.column_schemas().get(ts_idx)
&& col.name != AUTO_CREATED_PLACEHOLDER_TS_COL
{
required.insert(col.name.clone());
}
required
}
}
impl TreeNodeRewriter for ColumnMatcherRewriter {
@@ -721,7 +834,8 @@ mod test {
let ctx = QueryContext::arc();
for (before, after, column_schemas) in testcases {
let schema = Arc::new(Schema::new(column_schemas));
let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema);
let mut add_auto_column_rewriter =
ColumnMatcherRewriter::new(schema, Vec::new(), false);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
.await

View File

@@ -0,0 +1,201 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE NULL,
time_window TIMESTAMP,
update_at TIMESTAMP,
bb DOUBLE NULL,
TIME INDEX(time_window)
)with('merge_mode'='last_non_null');
Affected Rows: 0
INSERT INTO approx_rate(rate, time_window, update_at) VALUES (0.0, '2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.100');
Affected Rows: 1
INSERT INTO approx_rate(time_window, update_at, bb) VALUES ('2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.200', 50.0);
Affected Rows: 1
select * from approx_rate;
+------+---------------------+-------------------------+------+
| rate | time_window | update_at | bb |
+------+---------------------+-------------------------+------+
| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:00.200 | 50.0 |
+------+---------------------+-------------------------+------+
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:10' as update_at
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
SHOW CREATE TABLE approx_rate;
+-------------+--------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( |
| | "rate" DOUBLE NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "bb" DOUBLE NULL, |
| | TIME INDEX ("time_window") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | merge_mode = 'last_non_null' |
| | ) |
+-------------+--------------------------------------------+
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
+--------------------------------------+
| ADMIN FLUSH_FLOW('find_approx_rate') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT * FROM approx_rate;
+------+---------------------+---------------------+------+
| rate | time_window | update_at | bb |
+------+---------------------+---------------------+------+
| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:10 | 50.0 |
| 0.0 | 2023-01-01T00:00:30 | 2023-01-01T00:00:10 | |
+------+---------------------+---------------------+------+
CREATE FLOW find_bb_only SINK TO approx_rate AS
SELECT
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:59' as update_at,
CAST(max(byte) AS DOUBLE) as bb
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
-- make new windows dirty after flow is created
INSERT INTO bytes_log VALUES (600, '2023-01-01 00:00:10'), (320, '2023-01-01 00:00:35');
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_bb_only');
+----------------------------------+
| ADMIN FLUSH_FLOW('find_bb_only') |
+----------------------------------+
| FLOW_FLUSHED |
+----------------------------------+
SELECT * FROM approx_rate;
+------+---------------------+---------------------+-------+
| rate | time_window | update_at | bb |
+------+---------------------+---------------------+-------+
| 0.0 | 2023-01-01T00:00:00 | 2023-01-01T00:00:59 | 600.0 |
| 0.0 | 2023-01-01T00:00:30 | 2023-01-01T00:00:59 | 320.0 |
+------+---------------------+---------------------+-------+
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP FLOW find_bb_only;
Affected Rows: 0
DROP TABLE bytes_log;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
Affected Rows: 0
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE NULL,
time_window TIMESTAMP,
update_at TIMESTAMP,
bb DOUBLE NULL,
TIME INDEX(time_window)
);
Affected Rows: 0
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:10' as update_at
from
bytes_log
GROUP BY
time_window;
Affected Rows: 0
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
Affected Rows: 2
-- should return error
ADMIN FLUSH_FLOW('find_approx_rate');
Error: 1002(Unexpected), Failed to execute admin function: Failed to execute admin function flush_flow, error: Execution error: Function execution error: Internal error: 1003: Execution error: Function execution error: Internal error: 1003
DROP FLOW find_approx_rate;
Affected Rows: 0
DROP TABLE bytes_log;
Affected Rows: 0
DROP TABLE approx_rate;
Affected Rows: 0

View File

@@ -0,0 +1,108 @@
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE NULL,
time_window TIMESTAMP,
update_at TIMESTAMP,
bb DOUBLE NULL,
TIME INDEX(time_window)
)with('merge_mode'='last_non_null');
INSERT INTO approx_rate(rate, time_window, update_at) VALUES (0.0, '2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.100');
INSERT INTO approx_rate(time_window, update_at, bb) VALUES ('2023-01-01 00:00:00', TIMESTAMP '2023-01-01 00:00:00.200', 50.0);
select * from approx_rate;
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:10' as update_at
from
bytes_log
GROUP BY
time_window;
SHOW CREATE TABLE approx_rate;
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_approx_rate');
SELECT * FROM approx_rate;
CREATE FLOW find_bb_only SINK TO approx_rate AS
SELECT
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:59' as update_at,
CAST(max(byte) AS DOUBLE) as bb
from
bytes_log
GROUP BY
time_window;
-- make new windows dirty after flow is created
INSERT INTO bytes_log VALUES (600, '2023-01-01 00:00:10'), (320, '2023-01-01 00:00:35');
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('find_bb_only');
SELECT * FROM approx_rate;
DROP FLOW find_approx_rate;
DROP FLOW find_bb_only;
DROP TABLE bytes_log;
DROP TABLE approx_rate;
CREATE TABLE bytes_log (
byte INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- event time
TIME INDEX(ts)
);
-- TODO(discord9): remove this after auto infer table's time index is impl
CREATE TABLE approx_rate (
rate DOUBLE NULL,
time_window TIMESTAMP,
update_at TIMESTAMP,
bb DOUBLE NULL,
TIME INDEX(time_window)
);
CREATE FLOW find_approx_rate SINK TO approx_rate AS
SELECT
(max(byte) - min(byte)) / 30.0 as rate,
date_bin(INTERVAL '30 second', ts) as time_window,
TIMESTAMP '2023-01-01 00:00:10' as update_at
from
bytes_log
GROUP BY
time_window;
INSERT INTO
bytes_log
VALUES
(NULL, '2023-01-01 00:00:01'),
(300, '2023-01-01 00:00:31');
-- should return error
ADMIN FLUSH_FLOW('find_approx_rate');
DROP FLOW find_approx_rate;
DROP TABLE bytes_log;
DROP TABLE approx_rate;