feat(flow): expire arrange according to time_index type (#3956)

* feat: render_reduce's arrangement expire after time passed

* feat: set expire when create flow
This commit is contained in:
discord9
2024-05-16 19:41:03 +08:00
committed by GitHub
parent 0d9e71b653
commit a45017ad71
5 changed files with 45 additions and 4 deletions

View File

@@ -236,7 +236,6 @@ impl<'s> Worker<'s> {
create_if_not_exist: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let _ = expire_when;
let already_exist = self.task_states.contains_key(&flow_id);
match (already_exist, create_if_not_exist) {
(true, true) => return Ok(None),
@@ -248,6 +247,7 @@ impl<'s> Worker<'s> {
err_collector,
..Default::default()
};
cur_task_state.state.set_expire_after(expire_when);
{
let mut ctx = cur_task_state.new_ctx(sink_id);

View File

@@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan),
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}

View File

@@ -29,8 +29,8 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector
use crate::expr::error::{DataTypeSnafu, InternalSnafu};
use crate::expr::{AggregateExpr, EvalError, ScalarExpr};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, DiffRow, KeyValDiffRow, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter};
use crate::repr::{self, DiffRow, KeyValDiffRow, RelationType, Row};
use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, KeyExpiryManager};
impl<'referred, 'df> Context<'referred, 'df> {
const REDUCE: &'static str = "reduce";
@@ -42,6 +42,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input: Box<TypedPlan>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
output_type: RelationType,
) -> Result<CollectionBundle, Error> {
let input = self.render_plan(*input)?;
// first assembly key&val that's ((Row, Row), tick, diff)
@@ -52,6 +53,15 @@ impl<'referred, 'df> Context<'referred, 'df> {
// TODO(discord9): config global expire time from self
let arrange_handler = self.compute_state.new_arrange(None);
if let (Some(time_index), Some(expire_after)) =
(output_type.time_index, self.compute_state.expire_after())
{
let expire_man =
KeyExpiryManager::new(Some(expire_after), Some(ScalarExpr::Column(time_index)));
arrange_handler.write().set_expire_state(expire_man);
}
// reduce need full arrangement to be able to query all keys
let arrange_handler_inner = arrange_handler.clone_full_arrange().context(PlanSnafu {
reason: "No write is expected at this point",
@@ -874,6 +884,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
@@ -948,6 +959,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
@@ -1028,6 +1040,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
@@ -1104,6 +1117,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();
@@ -1195,6 +1209,7 @@ mod test {
Box::new(input_plan.with_types(typ)),
key_val_plan,
reduce_plan,
RelationType::empty(),
)
.unwrap();

View File

@@ -42,6 +42,8 @@ pub struct DataflowState {
/// save all used arrange in this dataflow, since usually there is no delete operation
/// we can just keep track of all used arrange and schedule subgraph when they need to be updated
arrange_used: Vec<ArrangeHandler>,
/// the time arrangement need to be expired after a certain time in milliseconds
expire_after: Option<Timestamp>,
}
impl DataflowState {
@@ -99,6 +101,14 @@ impl DataflowState {
pub fn get_err_collector(&self) -> ErrCollector {
self.err_collector.clone()
}
pub fn set_expire_after(&mut self, after: Option<repr::Duration>) {
self.expire_after = after;
}
pub fn expire_after(&self) -> Option<Timestamp> {
self.expire_after
}
}
#[derive(Debug, Clone)]

View File

@@ -31,6 +31,7 @@ use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow,
pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
/// A spine of batches, arranged by timestamp
/// TODO(discord9): consider internally index by key, value, and timestamp for faster lookup
pub type Spine = BTreeMap<Timestamp, Batch>;
/// Determine when should a key expire according to it's event timestamp in key.
@@ -51,6 +52,17 @@ pub struct KeyExpiryManager {
}
impl KeyExpiryManager {
pub fn new(
key_expiration_duration: Option<Duration>,
event_timestamp_from_row: Option<ScalarExpr>,
) -> Self {
Self {
event_ts_to_key: Default::default(),
key_expiration_duration,
event_timestamp_from_row,
}
}
/// Extract event timestamp from key row.
///
/// If no expire state is set, return None.
@@ -177,6 +189,10 @@ impl Arrangement {
}
}
pub fn set_expire_state(&mut self, expire_state: KeyExpiryManager) {
self.expire_state = Some(expire_state);
}
/// Apply updates into spine, with no respect of whether the updates are in futures, past, or now.
///
/// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired.