From e8660a6f7e962f2e1b742c79af60d249634def53 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 22 May 2024 17:50:26 +0800 Subject: [PATCH] fix: expire state --- src/flow/src/compute/render/reduce.rs | 20 ++++++++++++- src/flow/src/expr/error.rs | 7 +++++ src/flow/src/expr/scalar.rs | 2 ++ src/flow/src/transform/aggr.rs | 25 +++++++++------- src/flow/src/utils.rs | 41 ++++++++++++++++++++++----- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index e46f8c2bed..7a443e9991 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; -use crate::expr::error::{DataTypeSnafu, InternalSnafu}; +use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; @@ -397,6 +397,24 @@ fn reduce_accum_subgraph( // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key_to_vals { + if let Some(expire_man) = &arrange.get_expire_state() { + let mut is_expired = false; + err_collector.run(|| { + if let Some(expired) = expire_man.get_expire_duration(now, &key)? { + is_expired = true; + DataAlreadyExpiredSnafu { + expired_by: expired, + } + .fail() + } else { + Ok(()) + } + }); + if is_expired { + // errors already collected, we can just continue to next key + continue; + } + } let col_diffs = { let row_len = value_diffs[0].0.len(); let res = err_collector.run(|| get_col_diffs(value_diffs, row_len)); diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 5a28234239..09ad758056 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -100,4 +100,11 @@ pub enum EvalError { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Incoming data already expired by {} ms", expired_by))] + DataAlreadyExpired { + expired_by: i64, + #[snafu(implicit)] + location: Location, + }, } diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index dfd5fcd0f2..984d6f1a44 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -45,6 +45,8 @@ impl TypedExpr { impl TypedExpr { /// expand multi-value expression to multiple expressions with new indices + /// + /// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling` pub fn expand_multi_value( input_typ: &RelationType, exprs: &[TypedExpr], diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 67f0ac588a..1217281b1b 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -434,9 +434,10 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + /// TODO(discord9): add more illegal sql tests #[tokio::test] - async fn test_tumble_compsite() { + async fn test_tumble_composite() { let engine = create_test_query_engine(); let sql = "SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; @@ -469,12 +470,6 @@ mod test { els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())), }; let expected = TypedPlan { - typ: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), false), // number - ColumnType::new(CDT::uint64_datatype(), true), // sum(number) - ColumnType::new(CDT::datetime_datatype(), false), // window start - ColumnType::new(CDT::datetime_datatype(), false), // window end - ]), // TODO(discord9): mfp indirectly ref to key columns /* .with_key(vec![1]) @@ -536,11 +531,13 @@ mod test { } .with_types( RelationType::new(vec![ + // keys ColumnType::new(CDT::datetime_datatype(), false), // window start(time index) ColumnType::new(CDT::datetime_datatype(), false), // window end(pk) ColumnType::new(CDT::uint32_datatype(), false), // number(pk) - ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) - ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) + // values + ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number) + ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number) ]) .with_key(vec![1, 2]) .with_time_index(Some(0)), @@ -548,8 +545,8 @@ mod test { ), mfp: MapFilterProject::new(5) .map(vec![ - avg_expr, - ScalarExpr::Column(2), + avg_expr, // avg(number) + ScalarExpr::Column(2), // number(pk) ScalarExpr::Column(5), ScalarExpr::Column(0), ScalarExpr::Column(1), @@ -558,6 +555,12 @@ mod test { .project(vec![6, 7, 8, 9]) .unwrap(), }, + typ: RelationType::new(vec![ + ColumnType::new(CDT::uint32_datatype(), false), // number + ColumnType::new(CDT::uint64_datatype(), true), // avg(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]), }; assert_eq!(flow_plan, expected); } diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 93edf176e7..ea1885e506 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Bound; use std::sync::Arc; +use common_telemetry::debug; use itertools::Itertools; use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; @@ -86,7 +87,7 @@ impl KeyExpiryManager { /// /// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired. /// - If it's not expired, return None - pub fn update_event_ts( + pub fn get_expire_duration_and_update_event_ts( &mut self, now: Timestamp, row: &Row, @@ -95,6 +96,26 @@ impl KeyExpiryManager { return Ok(None); }; + self.event_ts_to_key + .entry(event_ts) + .or_default() + .insert(row.clone()); + + self.get_expire_duration(now, row) + } + + /// Get the expire duration of a key, if it's expired by now. + /// + /// Return None if the key is not expired + pub fn get_expire_duration( + &self, + now: Timestamp, + row: &Row, + ) -> Result, EvalError> { + let Some(event_ts) = self.extract_event_ts(row)? else { + return Ok(None); + }; + if let Some(expire_time) = self.compute_expiration_timestamp(now) { if expire_time > event_ts { // return how much time it's expired @@ -102,10 +123,6 @@ impl KeyExpiryManager { } } - self.event_ts_to_key - .entry(event_ts) - .or_default() - .insert(row.clone()); Ok(None) } @@ -189,6 +206,10 @@ impl Arrangement { } } + pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> { + self.expire_state.as_ref() + } + pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) { self.expire_state = Some(expire_state); } @@ -208,8 +229,12 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? { max_expired_by = max_expired_by.max(Some(expired_by)); + debug!( + "Expired key: {:?}, expired by: {:?} with time being now={}", + key, expired_by, now + ); continue; } } @@ -335,7 +360,9 @@ impl Arrangement { for (key, updates) in batch { // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(expired_by) = s.update_event_ts(now, &key)? { + if let Some(expired_by) = + s.get_expire_duration_and_update_event_ts(now, &key)? + { max_expired_by = max_expired_by.max(Some(expired_by)); continue; }