From a45017ad714e3bf400f55dec14a531bf3f0402ff Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 16 May 2024 19:41:03 +0800 Subject: [PATCH] feat(flow): expire arrange according to time_index type (#3956) * feat: render_reduce's arrangement expire after time passed * feat: set expire when create flow --- src/flow/src/adapter/worker.rs | 2 +- src/flow/src/compute/render.rs | 2 +- src/flow/src/compute/render/reduce.rs | 19 +++++++++++++++++-- src/flow/src/compute/state.rs | 10 ++++++++++ src/flow/src/utils.rs | 16 ++++++++++++++++ 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 1dc41db048..659c6fedf8 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -236,7 +236,6 @@ impl<'s> Worker<'s> { create_if_not_exist: bool, err_collector: ErrCollector, ) -> Result, Error> { - let _ = expire_when; let already_exist = self.task_states.contains_key(&flow_id); match (already_exist, create_if_not_exist) { (true, true) => return Ok(None), @@ -248,6 +247,7 @@ impl<'s> Worker<'s> { err_collector, ..Default::default() }; + cur_task_state.state.set_expire_after(expire_when); { let mut ctx = cur_task_state.new_ctx(sink_id); diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index bf298e86bc..8279974b47 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> { input, key_val_plan, reduce_plan, - } => self.render_reduce(input, key_val_plan, reduce_plan), + } => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ), Plan::Join { .. } => NotImplementedSnafu { reason: "Join is still WIP", } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index da2bb11f4b..c43ce54f8c 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -29,8 +29,8 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; -use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; -use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter}; +use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row}; +use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager}; impl<'referred, 'df> Context<'referred, 'df> { const REDUCE: &'static str = "reduce"; @@ -42,6 +42,7 @@ impl<'referred, 'df> Context<'referred, 'df> { input: Box, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, + output_type: RelationType, ) -> Result { let input = self.render_plan(*input)?; // first assembly key&val that's ((Row, Row), tick, diff) @@ -52,6 +53,15 @@ impl<'referred, 'df> Context<'referred, 'df> { // TODO(discord9): config global expire time from self let arrange_handler = self.compute_state.new_arrange(None); + + if let (Some(time_index), Some(expire_after)) = + (output_type.time_index, self.compute_state.expire_after()) + { + let expire_man = + KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index))); + arrange_handler.write().set_expire_state(expire_man); + } + // reduce need full arrangement to be able to query all keys let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu { reason: "No write is expected at this point", @@ -874,6 +884,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -948,6 +959,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -1028,6 +1040,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -1104,6 +1117,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); @@ -1195,6 +1209,7 @@ mod test { Box::new(input_plan.with_types(typ)), key_val_plan, reduce_plan, + RelationType::empty(), ) .unwrap(); diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index a9a431de97..a935600554 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -42,6 +42,8 @@ pub struct DataflowState { /// save all used arrange in this dataflow, since usually there is no delete operation /// we can just keep track of all used arrange and schedule subgraph when they need to be updated arrange_used: Vec, + /// the time arrangement need to be expired after a certain time in milliseconds + expire_after: Option, } impl DataflowState { @@ -99,6 +101,14 @@ impl DataflowState { pub fn get_err_collector(&self) -> ErrCollector { self.err_collector.clone() } + + pub fn set_expire_after(&mut self, after: Option) { + self.expire_after = after; + } + + pub fn expire_after(&self) -> Option { + self.expire_after + } } #[derive(Debug, Clone)] diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index a6887a753b..93edf176e7 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -31,6 +31,7 @@ use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, pub type Batch = BTreeMap>; /// A spine of batches, arranged by timestamp +/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup pub type Spine = BTreeMap; /// Determine when should a key expire according to it's event timestamp in key. @@ -51,6 +52,17 @@ pub struct KeyExpiryManager { } impl KeyExpiryManager { + pub fn new( + key_expiration_duration: Option, + event_timestamp_from_row: Option, + ) -> Self { + Self { + event_ts_to_key: Default::default(), + key_expiration_duration, + event_timestamp_from_row, + } + } + /// Extract event timestamp from key row. /// /// If no expire state is set, return None. @@ -177,6 +189,10 @@ impl Arrangement { } } + pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) { + self.expire_state = Some(expire_state); + } + /// Apply updates into spine, with no respect of whether the updates are in futures, past, or now. /// /// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.