fix: expire state

This commit is contained in:
discord9
2024-05-22 17:50:26 +08:00
parent 6659f3cc62
commit e8660a6f7e
5 changed files with 76 additions and 19 deletions

View File

@@ -26,7 +26,7 @@ use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::{Context, SubgraphArg};
use crate::compute::state::Scheduler;
use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff};
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
@@ -397,6 +397,24 @@ fn reduce_accum_subgraph(
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {
if let Some(expire_man) = &arrange.get_expire_state() {
let mut is_expired = false;
err_collector.run(|| {
if let Some(expired) = expire_man.get_expire_duration(now, &key)? {
is_expired = true;
DataAlreadyExpiredSnafu {
expired_by: expired,
}
.fail()
} else {
Ok(())
}
});
if is_expired {
// errors already collected, we can just continue to next key
continue;
}
}
let col_diffs = {
let row_len = value_diffs[0].0.len();
let res = err_collector.run(|| get_col_diffs(value_diffs, row_len));

View File

@@ -100,4 +100,11 @@ pub enum EvalError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Incoming data already expired by {} ms", expired_by))]
DataAlreadyExpired {
expired_by: i64,
#[snafu(implicit)]
location: Location,
},
}

View File

@@ -45,6 +45,8 @@ impl TypedExpr {
impl TypedExpr {
/// expand multi-value expression to multiple expressions with new indices
///
/// Currently it just mean expand `TumbleWindow` to `TumbleWindowFloor` and `TumbleWindowCeiling`
pub fn expand_multi_value(
input_typ: &RelationType,
exprs: &[TypedExpr],

View File

@@ -434,9 +434,10 @@ mod test {
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
/// TODO(discord9): add more illegal sql tests
#[tokio::test]
async fn test_tumble_compsite() {
async fn test_tumble_composite() {
let engine = create_test_query_engine();
let sql =
"SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
@@ -469,12 +470,6 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
@@ -536,11 +531,13 @@ mod test {
}
.with_types(
RelationType::new(vec![
// keys
ColumnType::new(CDT::datetime_datatype(), false), // window start(time index)
ColumnType::new(CDT::datetime_datatype(), false), // window end(pk)
ColumnType::new(CDT::uint32_datatype(), false), // number(pk)
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number)
// values
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number)
])
.with_key(vec![1, 2])
.with_time_index(Some(0)),
@@ -548,8 +545,8 @@ mod test {
),
mfp: MapFilterProject::new(5)
.map(vec![
avg_expr,
ScalarExpr::Column(2),
avg_expr, // avg(number)
ScalarExpr::Column(2), // number(pk)
ScalarExpr::Column(5),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
@@ -558,6 +555,12 @@ mod test {
.project(vec![6, 7, 8, 9])
.unwrap(),
},
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::uint64_datatype(), true), // avg(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
};
assert_eq!(flow_plan, expected);
}

View File

@@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
use std::sync::Arc;
use common_telemetry::debug;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
@@ -86,7 +87,7 @@ impl KeyExpiryManager {
///
/// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired.
/// - If it's not expired, return None
pub fn update_event_ts(
pub fn get_expire_duration_and_update_event_ts(
&mut self,
now: Timestamp,
row: &Row,
@@ -95,6 +96,26 @@ impl KeyExpiryManager {
return Ok(None);
};
self.event_ts_to_key
.entry(event_ts)
.or_default()
.insert(row.clone());
self.get_expire_duration(now, row)
}
/// Get the expire duration of a key, if it's expired by now.
///
/// Return None if the key is not expired
pub fn get_expire_duration(
&self,
now: Timestamp,
row: &Row,
) -> Result<Option<Duration>, EvalError> {
let Some(event_ts) = self.extract_event_ts(row)? else {
return Ok(None);
};
if let Some(expire_time) = self.compute_expiration_timestamp(now) {
if expire_time > event_ts {
// return how much time it's expired
@@ -102,10 +123,6 @@ impl KeyExpiryManager {
}
}
self.event_ts_to_key
.entry(event_ts)
.or_default()
.insert(row.clone());
Ok(None)
}
@@ -189,6 +206,10 @@ impl Arrangement {
}
}
pub fn get_expire_state(&self) -> Option<&KeyExpiryManager> {
self.expire_state.as_ref()
}
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
self.expire_state = Some(expire_state);
}
@@ -208,8 +229,12 @@ impl Arrangement {
for ((key, val), update_ts, diff) in updates {
// check if the key is expired
if let Some(s) = &mut self.expire_state {
if let Some(expired_by) = s.update_event_ts(now, &key)? {
if let Some(expired_by) = s.get_expire_duration_and_update_event_ts(now, &key)? {
max_expired_by = max_expired_by.max(Some(expired_by));
debug!(
"Expired key: {:?}, expired by: {:?} with time being now={}",
key, expired_by, now
);
continue;
}
}
@@ -335,7 +360,9 @@ impl Arrangement {
for (key, updates) in batch {
// check if the key is expired
if let Some(s) = &mut self.expire_state {
if let Some(expired_by) = s.update_event_ts(now, &key)? {
if let Some(expired_by) =
s.get_expire_duration_and_update_event_ts(now, &key)?
{
max_expired_by = max_expired_by.max(Some(expired_by));
continue;
}