From 659d34a170251fea5c910f5b8842c7274745867a Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 24 Apr 2024 15:09:53 +0800 Subject: [PATCH] refactor(flow): refine comments and code (#3785) * refactor(flow): refine comments and code Signed-off-by: Zhenchi * doc: description of the properties of removed keys Signed-off-by: Zhenchi * fix: `get`'s fast path for cur val --------- Signed-off-by: Zhenchi Co-authored-by: discord9 --- src/flow/src/compute/render/map.rs | 3 +- src/flow/src/compute/state.rs | 4 +- src/flow/src/lib.rs | 1 + src/flow/src/utils.rs | 904 +++++++++++++++-------------- 4 files changed, 470 insertions(+), 442 deletions(-) diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index a0e29f15a6..4264693391 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -124,7 +124,6 @@ fn mfp_subgraph( // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. // 2. Output the updates. // 3. Truncate all updates within that range. - let from = arrange.read().last_compaction_time().map(|n| n + 1); let from = from.unwrap_or(repr::Timestamp::MIN); let output_kv = arrange.read().get_updates_in_range(from..=now); @@ -135,7 +134,7 @@ fn mfp_subgraph( .collect_vec(); send.give(output); let run_compaction = || { - arrange.write().compaction_to(now)?; + arrange.write().compact_to(now)?; Ok(()) }; err_collector.run(run_compaction); diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index c062aae480..5bcb3a7ab1 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -46,9 +46,7 @@ pub struct DataflowState { impl DataflowState { pub fn new_arrange(&mut self, name: Option>) -> ArrangeHandler { - let arrange = name - .map(Arrangement::new_with_name) - .unwrap_or_else(Arrangement::new); + let arrange = name.map(Arrangement::new_with_name).unwrap_or_default(); let arr = ArrangeHandler::from(arrange); // mark this arrange as used in this dataflow diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index a7392a69f4..dac53002cb 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -16,6 +16,7 @@ //! It can transform substrait plan into it's own plan and execute it. //! It also contains definition of expression, adapter and plan, and internal state management. +#![feature(let_chains)] #![allow(dead_code)] #![allow(unused_imports)] #![warn(missing_docs)] diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 6a56d8c5d8..a6887a753b 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -33,23 +33,27 @@ pub type Batch = BTreeMap>; /// A spine of batches, arranged by timestamp pub type Spine = BTreeMap; -/// Determine when should a key expire according to it's event timestamp in key, -/// if a key is expired, any future updates to it should be ignored -/// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp) +/// Determine when should a key expire according to it's event timestamp in key. +/// +/// If a key is expired, any future updates to it should be ignored. +/// +/// Note that key is expired by it's event timestamp (contained in the key), not by the time it's inserted (system timestamp). #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] pub struct KeyExpiryManager { - /// a map from event timestamp to key, used for expire keys + /// A map from event timestamp to key, used for expire keys. event_ts_to_key: BTreeMap>, - /// duration after which a key is considered expired, and will be removed from state + + /// Duration after which a key is considered expired, and will be removed from state key_expiration_duration: Option, - /// using this to get timestamp from key row + + /// Expression to get timestamp from key row event_timestamp_from_row: Option, } impl KeyExpiryManager { - /// extract event timestamp from key row + /// Extract event timestamp from key row. /// - /// if no expire state is set, return None + /// If no expire state is set, return None. pub fn extract_event_ts(&self, row: &Row) -> Result, EvalError> { let ts = self .event_timestamp_from_row @@ -61,101 +65,107 @@ impl KeyExpiryManager { Ok(ts) } - /// return timestamp that should be expired by the time `now` by compute `now - expiration_duration` + /// Return timestamp that should be expired by the time `now` by compute `now - expiration_duration` pub fn compute_expiration_timestamp(&self, now: Timestamp) -> Option { self.key_expiration_duration.map(|d| now - d) } - /// update the event timestamp to key mapping + /// Update the event timestamp to key mapping. /// - /// if given key is expired by now(that is lesser than `now - expiry_duration`), return the amount of time it's expired - /// if it's not expired, return None + /// - If given key is expired by now (that is less than `now - expiry_duration`), return the amount of time it's expired. + /// - If it's not expired, return None pub fn update_event_ts( &mut self, now: Timestamp, row: &Row, ) -> Result, EvalError> { - let ts = if let Some(event_ts) = self.extract_event_ts(row)? { - let ret = self.compute_expiration_timestamp(now).and_then(|e| { - if e > event_ts { - // return how much time it's expired - Some(e - event_ts) - } else { - None - } - }); - if let Some(expire_by) = ret { - return Ok(Some(expire_by)); - } - event_ts - } else { + let Some(event_ts) = self.extract_event_ts(row)? else { return Ok(None); }; + if let Some(expire_time) = self.compute_expiration_timestamp(now) { + if expire_time > event_ts { + // return how much time it's expired + return Ok(Some(expire_time - event_ts)); + } + } + self.event_ts_to_key - .entry(ts) + .entry(event_ts) .or_default() .insert(row.clone()); Ok(None) } + + /// Remove expired keys from the state, and return an iterator of removed keys with + /// event_ts less than expire time (i.e. now - key_expiration_duration). + pub fn remove_expired_keys(&mut self, now: Timestamp) -> Option> { + let expire_time = self.compute_expiration_timestamp(now)?; + + let mut before = self.event_ts_to_key.split_off(&expire_time); + std::mem::swap(&mut before, &mut self.event_ts_to_key); + + Some(before.into_iter().flat_map(|(_ts, keys)| keys.into_iter())) + } } -/// A shared state of key-value pair for various state -/// in dataflow execution +/// A shared state of key-value pair for various state in dataflow execution. /// /// i.e: Mfp operator with temporal filter need to store it's future output so that it can add now, and delete later. -/// To get all needed updates in a time span, use [`get_updates_in_range`] +/// To get all needed updates in a time span, use [`get_updates_in_range`]. /// -/// And reduce operator need full state of it's output, so that it can query(and modify by calling [`apply_updates`]) +/// And reduce operator need full state of it's output, so that it can query (and modify by calling [`apply_updates`]) /// existing state, also need a way to expire keys. To get a key's current value, use [`get`] with time being `now` /// so it's like: /// `mfp operator -> arrange(store futures only, no expire) -> reduce operator <-> arrange(full, with key expiring time) -> output` /// /// Note the two way arrow between reduce operator and arrange, it's because reduce operator need to query existing state -/// and also need to update existing state -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +/// and also need to update existing state. +#[derive(Debug, Clone, Default, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] pub struct Arrangement { - /// all the updates that pending to be applied - /// arranged in time -> (key -> (new_val, diff)) - /// all updates where the update time is greater than the last key but less than or equal to the current key - /// are updates are categorized under current key. - /// - /// that is: `last key < update time <= current key` - /// or for time that's before the first key, just being categorized under the first key - /// The first key is always `now` which include consolidated updates from past, representing the current state of arrangement - /// - /// Note that for a given time and key, there might be a bunch of updates and they should be applied in order - /// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1` - /// - /// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense - /// TODO: batch size balancing? - spine: Spine, - /// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce` - /// - /// in which `arrange` operator only need future updates, and don't need to keep all updates - full_arrangement: bool, - /// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared - is_written: bool, - /// manage the expire state of the arrangement - expire_state: Option, - /// the time that the last compaction happened, also know as current time - last_compaction_time: Option, + /// A name or identifier for the arrangement which can be used for debugging or logging purposes. + /// This field is not critical to the functionality but aids in monitoring and management of arrangements. name: Vec, + + /// Manages a collection of pending updates in a `BTreeMap` where each key is a timestamp and each value is a `Batch` of updates. + /// Updates are grouped into batched based on their timestamps. + /// Each batch covers a range of time from the last key (exclusive) to the current key (inclusive). + /// + /// - Updates with a timestamp (`update_ts`) that falls between two keys are placed in the batch of the higher key. + /// For example, if the keys are `1, 5, 7, 9` and `update_ts` is `6`, the update goes into the batch with key `7`. + /// - Updates with a timestamp before the first key are categorized under the first key. + /// - Updates with a timestamp greater than the highest key result in a new batch being created with that timestamp as the key. + /// + /// The first key represents the current state and includes consolidated updates from the past. It is always set to `now`. + /// Each key should have only one update per batch with a `diff=1` for the batch representing the current time (`now`). + /// + /// Since updates typically occur as a delete followed by an insert, a small vector of size 2 is used to store updates for efficiency. + /// + /// TODO: Consider balancing the batch size? + spine: Spine, + + /// Indicates whether the arrangement maintains a complete history of updates. + /// - `true`: Maintains all past and future updates, necessary for full state reconstruction at any point in time. + /// - `false`: Only future updates are retained, optimizing for scenarios where past state is irrelevant and conserving resources. + /// Useful for case like `map -> arrange -> reduce`. + full_arrangement: bool, + + /// Indicates whether the arrangement has been modified since its creation. + /// - `true`: The arrangement has been written to, meaning it has received updates. + /// Cloning this arrangement is generally unsafe as it may lead to inconsistencies if the clone is modified independently. + /// However, cloning is safe when both the original and the clone require a full arrangement, as this ensures consistency. + /// - `false`: The arrangement is in its initial state and has not been modified. It can be safely cloned and shared + /// without concerns of carrying over unintended state changes. + is_written: bool, + + /// Manage the expire state of the arrangement. + expire_state: Option, + + /// The time that the last compaction happened, also known as the current time. + last_compaction_time: Option, } impl Arrangement { - /// create a new empty arrangement - pub fn new() -> Self { - Self { - spine: Default::default(), - full_arrangement: false, - is_written: false, - expire_state: None, - last_compaction_time: None, - name: vec![], - } - } - pub fn new_with_name(name: Vec) -> Self { Self { spine: Default::default(), @@ -167,288 +177,270 @@ impl Arrangement { } } - /// apply updates into spine, with no respect of whether the updates are in futures, past, or now + /// Apply updates into spine, with no respect of whether the updates are in futures, past, or now. /// - /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired + /// Return the maximum expire time (already expire by how much time) of all updates if any keys is already expired. pub fn apply_updates( &mut self, now: Timestamp, updates: Vec, ) -> Result, EvalError> { - let mut max_late_by: Option = None; - if !self.is_written { - self.is_written = true; - } - for ((key, val), ts, diff) in updates { - // keep rows with expired event timestamp from being updated + self.is_written = true; + + let mut max_expired_by: Option = None; + + for ((key, val), update_ts, diff) in updates { + // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(late_by) = s.update_event_ts(now, &key)? { - max_late_by = Some(max_late_by.map_or(late_by, |v| v.max(late_by))); + if let Some(expired_by) = s.update_event_ts(now, &key)? { + max_expired_by = max_expired_by.max(Some(expired_by)); continue; } } - // the first batch with key that's greater or equal to ts - let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() { - batch - } else { - // if no batch with `batch key >= ts`, then create a new batch with key being `ts` - self.spine.entry(ts).or_default() - }; - + // If the `highest_ts` is less than `update_ts`, we need to create a new batch with key being `update_ts`. + if self + .spine + .last_key_value() + .map(|(highest_ts, _)| *highest_ts < update_ts) + .unwrap_or(true) { - let key_updates = batch.entry(key).or_insert(smallvec![]); - key_updates.push((val, ts, diff)); - // a stable sort make updates sort in order of insertion - // without changing the order of updates within same tick - key_updates.sort_by_key(|r| r.1); + self.spine.insert(update_ts, Default::default()); } + + // Get the first batch with key that's greater or equal to `update_ts`. + let (_, batch) = self + .spine + .range_mut(update_ts..) + .next() + .expect("Previous insert should have created the batch"); + + let key_updates = batch.entry(key).or_default(); + key_updates.push((val, update_ts, diff)); + + // a stable sort make updates sort in order of insertion + // without changing the order of updates within same tick + key_updates.sort_by_key(|(_val, ts, _diff)| *ts); } - Ok(max_late_by) + Ok(max_expired_by) } - /// find out the time of next update in the future - /// that is the next update with `timestamp > now` + /// Find out the time of next update in the future that is the next update with `timestamp > now`. pub fn get_next_update_time(&self, now: &Timestamp) -> Option { // iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch - let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded)); - for (_ts, batch) in next_batches { + for (_ts, batch) in self.spine.range((Bound::Excluded(now), Bound::Unbounded)) { let min_ts = batch .iter() - .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts)) + .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts).min()) .min(); - if let Some(min_ts) = min_ts { - return Some(min_ts); - } else { - continue; + + if min_ts.is_some() { + return min_ts; } } - // all batches are empty, return now + None } - /// get the last compaction time + /// Get the last compaction time. pub fn last_compaction_time(&self) -> Option { self.last_compaction_time } - /// split spine off at `now`, and return the spine that's before `now`(including `now`) - fn split_lte(&mut self, now: &Timestamp) -> Spine { - let mut before = self.spine.split_off(&(now + 1)); + /// Split spine off at `split_ts`, and return the spine that's before `split_ts` (including `split_ts`). + fn split_spine_le(&mut self, split_ts: &Timestamp) -> Spine { + self.split_batch_at(split_ts); + let mut before = self.spine.split_off(&(split_ts + 1)); std::mem::swap(&mut before, &mut self.spine); - - // if before's last key == now, then all the keys we needed are found - if before - .last_key_value() - .map(|(k, _v)| *k == *now) - .unwrap_or(false) - { - return before; - } - - // also need to move all keys from the first batch in spine with timestamp<=now to before - // we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch - if let Some(mut first_batch) = self.spine.first_entry() { - let mut new_batch: Batch = Default::default(); - // remove all keys with val of empty vec - first_batch.get_mut().retain(|key, updates| { - // remove keys <= now from updates - updates.retain(|(val, ts, diff)| { - if *ts <= *now { - new_batch.entry(key.clone()).or_insert(smallvec![]).push(( - val.clone(), - *ts, - *diff, - )); - } - *ts > *now - }); - !updates.is_empty() - }); - - before.entry(*now).or_default().extend(new_batch); - } before } - /// advance time to `now` and consolidate all older(`now` included) updates to the first key + /// Split the batch at `split_ts` into two parts. + fn split_batch_at(&mut self, split_ts: &Timestamp) { + // FAST PATH: + // + // The `split_ts` hit the boundary of a batch, nothing to do. + if self.spine.contains_key(split_ts) { + return; + } + + let Some((_, batch_to_split)) = self.spine.range_mut(split_ts..).next() else { + return; // No batch to split, nothing to do. + }; + + // SLOW PATH: + // + // The `split_ts` is in the middle of a batch, we need to split the batch into two parts. + let mut new_batch = Batch::default(); + + batch_to_split.retain(|key, updates| { + let mut new_updates = SmallVec::default(); + + updates.retain(|(val, ts, diff)| { + if *ts <= *split_ts { + // Move the updates that are less than or equal to `split_ts` to the new batch. + new_updates.push((val.clone(), *ts, *diff)); + } + // Keep the updates that are greater than `split_ts` in the current batch. + *ts > *split_ts + }); + + if !new_updates.is_empty() { + new_batch.insert(key.clone(), new_updates); + } + + // Keep the key in the current batch if it still has updates. + !updates.is_empty() + }); + + if !new_batch.is_empty() { + self.spine.insert(*split_ts, new_batch); + } + } + + /// Advance time to `now` and consolidate all older (`now` included) updates to the first key. /// - /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired - pub fn compaction_to(&mut self, now: Timestamp) -> Result, EvalError> { - let mut max_late_by: Option = None; - - let should_compact = self.split_lte(&now); + /// Return the maximum expire time(already expire by how much time) of all updates if any keys is already expired. + pub fn compact_to(&mut self, now: Timestamp) -> Result, EvalError> { + let mut max_expired_by: Option = None; + let batches_to_compact = self.split_spine_le(&now); self.last_compaction_time = Some(now); - // if a full arrangement is not needed, we can just discard everything before and including now + + // If a full arrangement is not needed, we can just discard everything before and including now, if !self.full_arrangement { return Ok(None); } - // else we update them into current key value pairs - let mut compacted_batch: BTreeMap> = Default::default(); - for (_, batch) in should_compact { + // else we update them into current state. + let mut compacting_batch = Batch::default(); + + for (_, batch) in batches_to_compact { for (key, updates) in batch { + // check if the key is expired if let Some(s) = &mut self.expire_state { - if let Some(late_by) = s.update_event_ts(now, &key)? { - max_late_by = Some(max_late_by.map_or(late_by, |v| v.max(late_by))); + if let Some(expired_by) = s.update_event_ts(now, &key)? { + max_expired_by = max_expired_by.max(Some(expired_by)); continue; } } - // if diff cancel out each other, then remove the key - let mut old_row: Option = - compacted_batch.get(&key).and_then(|v| v.first()).cloned(); - for new_row in updates { - old_row = compact_diff_row(old_row, &new_row); + let mut row = compacting_batch + .remove(&key) + // only one row in the updates during compaction + .and_then(|mut updates| updates.pop()); + + for update in updates { + row = compact_diff_row(row, &update); } - if let Some(compacted_update) = old_row { - compacted_batch.insert(key, smallvec![compacted_update]); - } else { - compacted_batch.remove(&key); + if let Some(compacted_update) = row { + compacting_batch.insert(key, smallvec![compacted_update]); } } } // insert the compacted batch into spine with key being `now` - self.spine.insert(now, compacted_batch); - Ok(max_late_by) + self.spine.insert(now, compacting_batch); + Ok(max_expired_by) } - /// get the updates of the arrangement from the given range of time + /// Get the updates of the arrangement from the given range of time. pub fn get_updates_in_range + Clone>( &self, range: R, ) -> Vec { - let mut result = vec![]; - // three part: - // 1.the starting batch with first key >= range.start, which may contain updates that not in range - // 2. the batches with key in range - // 3. the last batch with first key > range.end, which may contain updates that are in range - let mut is_first = true; - for (_ts, batch) in self.spine.range(range.clone()) { - if is_first { - for (key, updates) in batch { - let iter = updates - .iter() - .filter(|(_val, ts, _diff)| range.contains(ts)) - .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); - result.extend(iter); - } - is_first = false; - } else { - for (key, updates) in batch.clone() { - result.extend( - updates - .iter() - .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)), - ); - } - } - } - - // deal with boundary include start and end - // and for the next batch with upper_bound >= range.end - // we need to search for updates within range - let neg_bound = match range.end_bound() { - Bound::Included(b) => { - // if boundary is aligned, the last batch in range actually cover the full range - // then there will be no further keys we need in the next batch - if self.spine.contains_key(b) { - return result; - } - Bound::Excluded(*b) - } - Bound::Excluded(b) => Bound::Included(*b), - Bound::Unbounded => return result, + // Include the next batch in case the range is not aligned with the boundary of a batch. + let batches = match range.end_bound() { + Bound::Included(t) => self.spine.range(range.clone()).chain( + self.spine + .range((Bound::Excluded(t), Bound::Unbounded)) + .next(), + ), + Bound::Excluded(t) => self.spine.range(range.clone()).chain( + self.spine + .range((Bound::Included(t), Bound::Unbounded)) + .next(), + ), + _ => self.spine.range(range.clone()).chain(None), }; - let search_range = (neg_bound, Bound::Unbounded); - if let Some(last_batch) = self.spine.range(search_range).next() { - for (key, updates) in last_batch.1 { - let iter = updates - .iter() - .filter(|(_val, ts, _diff)| range.contains(ts)) - .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); - result.extend(iter); - } - }; - result - } - /// expire keys in now that are older than expire_time, intended for reducing memory usage and limit late data arrive - pub fn trunc_expired(&mut self, now: Timestamp) { - if let Some(s) = &mut self.expire_state { - let expire_time = if let Some(t) = s.compute_expiration_timestamp(now) { - t - } else { - // never expire - return; - }; - // find all keys smaller than or equal expire_time and silently remove them - let mut after = s.event_ts_to_key.split_off(&(expire_time + 1)); - std::mem::swap(&mut s.event_ts_to_key, &mut after); - let before = after; - for key in before.into_iter().flat_map(|i| i.1.into_iter()) { - for (_ts, batch) in self.spine.iter_mut() { - batch.remove(&key); - } - } - } - } - - /// get current state of things - /// useful for query existing keys(i.e. reduce and join operator need to query existing state) - pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> { - if self.full_arrangement - && self - .spine - .first_key_value() - .map(|(ts, _)| *ts >= now) - .unwrap_or(false) - { - self.spine - .first_key_value() - .and_then(|(_ts, batch)| batch.get(key).and_then(|v| v.first()).cloned()) - } else { - // check keys <= now to know current value - let mut final_val = None; - - let with_extra_batch = { - let unaligned = self.spine.range(..=now); - if unaligned - .clone() - .last() - .map(|(ts, _)| *ts == now) - .unwrap_or(false) - { - // this extra chain is there just to make type the same - unaligned.chain(None) - } else { - // if the last key is not equal to now, then we need to include the next batch - // because we know last batch key < now < next batch key - // therefore next batch may contain updates that we want - unaligned.chain( - self.spine - .range((Bound::Excluded(now), Bound::Unbounded)) - .next(), - ) - } - }; - for (ts, batch) in with_extra_batch { - if let Some(new_rows) = batch.get(key).map(|v| v.iter()) { - if *ts <= now { - for new_row in new_rows { - final_val = compact_diff_row(final_val, new_row); - } - } else { - for new_row in new_rows.filter(|new_row| new_row.1 <= now) { - final_val = compact_diff_row(final_val, new_row); - } + let mut res = vec![]; + for (_, batch) in batches { + for (key, updates) in batch { + for (val, ts, diff) in updates { + if range.contains(ts) { + res.push(((key.clone(), val.clone()), *ts, *diff)); } } } - final_val } + res + } + + /// Expire keys in now that are older than expire_time, intended for reducing memory usage and limit late data arrive + pub fn truncate_expired_keys(&mut self, now: Timestamp) { + if let Some(s) = &mut self.expire_state { + if let Some(expired_keys) = s.remove_expired_keys(now) { + for key in expired_keys { + for (_, batch) in self.spine.iter_mut() { + batch.remove(&key); + } + } + } + } + } + + /// Get current state of things. + /// + /// Useful for query existing keys (i.e. reduce and join operator need to query existing state) + pub fn get(&self, now: Timestamp, key: &Row) -> Option { + // FAST PATH: + // + // If `now <= last_compaction_time`, and it's full arrangement, we can directly return the value + // from the current state (which should be the first batch in the spine if it exist). + if let Some(last_compaction_time) = self.last_compaction_time() + && now <= last_compaction_time + && self.full_arrangement + { + // if the last compaction time's batch is not exist, it means the spine doesn't have it's first batch as current value + return self + .spine + .get(&last_compaction_time) + .and_then(|batch| batch.get(key)) + .and_then(|updates| updates.first().cloned()); + } + + // SLOW PATH: + // + // Accumulate updates from the oldest batch to the batch containing `now`. + + let batches = if self.spine.contains_key(&now) { + // hit the boundary of a batch + self.spine.range(..=now).chain(None) + } else { + // not hit the boundary of a batch, should include the next batch + self.spine.range(..=now).chain( + self.spine + .range((Bound::Excluded(now), Bound::Unbounded)) + .next(), + ) + }; + + let mut final_val = None; + for (ts, batch) in batches { + if let Some(updates) = batch.get(key) { + if *ts <= now { + for update in updates { + final_val = compact_diff_row(final_val, update); + } + } else { + for update in updates.iter().filter(|(_, ts, _)| *ts <= now) { + final_val = compact_diff_row(final_val, update); + } + } + } + } + final_val } } @@ -478,6 +470,7 @@ pub type ArrangeWriter<'a> = tokio::sync::RwLockWriteGuard<'a, Arrangement>; pub struct ArrangeHandler { inner: Arc>, } + impl ArrangeHandler { /// create a new handler from arrangement pub fn from(arr: Arrangement) -> Self { @@ -487,18 +480,18 @@ impl ArrangeHandler { } /// write lock the arrangement - pub fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, Arrangement> { + pub fn write(&self) -> ArrangeWriter<'_> { self.inner.blocking_write() } /// read lock the arrangement - pub fn read(&self) -> tokio::sync::RwLockReadGuard<'_, Arrangement> { + pub fn read(&self) -> ArrangeReader<'_> { self.inner.blocking_read() } - /// clone the handler, but only keep the future updates + /// Clone the handler, but only keep the future updates. /// - /// it's a cheap operation, since it's `Arc-ed` and only clone the `Arc` + /// It's a cheap operation, since it's `Arc-ed` and only clone the `Arc`. pub fn clone_future_only(&self) -> Option { if self.read().is_written { return None; @@ -508,11 +501,12 @@ impl ArrangeHandler { }) } - /// clone the handler, but keep all updates - /// prevent illegal clone after the arrange have been written, - /// because that will cause loss of data before clone + /// Clone the handler, but keep all updates. /// - /// it's a cheap operation, since it's `Arc-ed` and only clone the `Arc` + /// Prevent illegal clone after the arrange have been written, + /// because that will cause loss of data before clone. + /// + /// It's a cheap operation, since it's `Arc-ed` and only clone the `Arc`. pub fn clone_full_arrange(&self) -> Option { { let zelf = self.read(); @@ -534,31 +528,40 @@ impl ArrangeHandler { #[cfg(test)] mod test { + use std::borrow::Borrow; + + use datatypes::value::Value; + use super::*; + fn lit(v: impl Into) -> Row { + Row::new(vec![v.into()]) + } + + fn kv(key: impl Borrow, value: impl Borrow) -> (Row, Row) { + (key.borrow().clone(), value.borrow().clone()) + } + #[test] fn test_future_get() { // test if apply only future updates, whether get(future_time) can operate correctly - let arr = Arrangement::new(); - let arr = ArrangeHandler::from(arr); + let arr = ArrangeHandler::from(Arrangement::default()); - { - let mut arr = arr.write(); - let key = Row::new(vec![1.into()]); - let updates: Vec = vec![ - ((key.clone(), Row::new(vec![2.into()])), 1, 1), - ((key.clone(), Row::new(vec![3.into()])), 2, 1), - ((key.clone(), Row::new(vec![4.into()])), 3, 1), - ]; - // all updates above are future updates - arr.apply_updates(0, updates).unwrap(); + let mut arr = arr.write(); - assert_eq!(arr.get(1, &key), Some((Row::new(vec![2.into()]), 1, 1))); + let key = lit("a"); + let updates: Vec = vec![ + (kv(&key, lit("b")), 1 /* ts */, 1 /* diff */), + (kv(&key, lit("c")), 2 /* ts */, 1 /* diff */), + (kv(&key, lit("d")), 3 /* ts */, 1 /* diff */), + ]; - assert_eq!(arr.get(2, &key), Some((Row::new(vec![3.into()]), 2, 1))); + // all updates above are future updates + arr.apply_updates(0, updates).unwrap(); - assert_eq!(arr.get(3, &key), Some((Row::new(vec![4.into()]), 3, 1))); - } + assert_eq!(arr.get(1, &key), Some((lit("b"), 1 /* ts */, 1 /* diff */))); + assert_eq!(arr.get(2, &key), Some((lit("c"), 2 /* ts */, 1 /* diff */))); + assert_eq!(arr.get(3, &key), Some((lit("d"), 3 /* ts */, 1 /* diff */))); } #[test] @@ -566,101 +569,126 @@ mod test { // mfp operator's temporal filter need to record future updates so that it can delete on time // i.e. insert a record now, delete this record 5 minutes later // they will only need to keep future updates(if downstream don't need full arrangement that is) - let arr = Arrangement::new(); - let arr = ArrangeHandler::from(arr); - let arr1 = arr.clone_full_arrange(); - assert!(arr1.is_some()); - let arr2 = arr.clone_future_only(); - assert!(arr2.is_some()); + let arr = ArrangeHandler::from(Arrangement::default()); + + { + let arr1 = arr.clone_full_arrange(); + assert!(arr1.is_some()); + let arr2 = arr.clone_future_only(); + assert!(arr2.is_some()); + } { let mut arr = arr.write(); let updates: Vec = vec![ - ((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 1, 1), - ((Row::new(vec![2.into()]), Row::new(vec![3.into()])), 2, 1), - ((Row::new(vec![3.into()]), Row::new(vec![4.into()])), 3, 1), + (kv(lit("a"), lit("x")), 1 /* ts */, 1 /* diff */), + (kv(lit("b"), lit("y")), 2 /* ts */, 1 /* diff */), + (kv(lit("c"), lit("z")), 3 /* ts */, 1 /* diff */), ]; // all updates above are future updates arr.apply_updates(0, updates).unwrap(); + assert_eq!( arr.get_updates_in_range(1..=1), - vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 1, 1)] + vec![(kv(lit("a"), lit("x")), 1 /* ts */, 1 /* diff */)] ); assert_eq!(arr.spine.len(), 3); - arr.compaction_to(1).unwrap(); + + arr.compact_to(1).unwrap(); assert_eq!(arr.spine.len(), 3); + + let key = &lit("a"); + assert_eq!(arr.get(3, key), Some((lit("x"), 1 /* ts */, 1 /* diff */))); + let key = &lit("b"); + assert_eq!(arr.get(3, key), Some((lit("y"), 2 /* ts */, 1 /* diff */))); + let key = &lit("c"); + assert_eq!(arr.get(3, key), Some((lit("z"), 3 /* ts */, 1 /* diff */))); } - let arr2 = arr.clone_full_arrange(); - assert!(arr2.is_some()); + assert!(arr.clone_future_only().is_none()); { - let mut arr = arr.write(); + let arr2 = arr.clone_full_arrange().unwrap(); + let mut arr = arr2.write(); assert_eq!(arr.spine.len(), 3); - arr.compaction_to(2).unwrap(); + + arr.compact_to(2).unwrap(); assert_eq!(arr.spine.len(), 2); + let key = &lit("a"); + assert_eq!(arr.get(3, key), Some((lit("x"), 1 /* ts */, 1 /* diff */))); + let key = &lit("b"); + assert_eq!(arr.get(3, key), Some((lit("y"), 2 /* ts */, 1 /* diff */))); + let key = &lit("c"); + assert_eq!(arr.get(3, key), Some((lit("z"), 3 /* ts */, 1 /* diff */))); } } #[test] fn test_reduce_expire_keys() { - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); let expire_state = KeyExpiryManager { event_ts_to_key: Default::default(), key_expiration_duration: Some(10), event_timestamp_from_row: Some(ScalarExpr::Column(0)), }; - let expire_state = Some(expire_state); - arr.expire_state = expire_state; + arr.expire_state = Some(expire_state); arr.full_arrangement = true; let arr = ArrangeHandler::from(arr); - let now = 0; - let key = Row::new(vec![1i64.into()]); + let updates: Vec = vec![ - ( - (Row::new(vec![1i64.into()]), Row::new(vec![2.into()])), - 1, - 1, - ), - ( - (Row::new(vec![2i64.into()]), Row::new(vec![3.into()])), - 2, - 1, - ), - ( - (Row::new(vec![3i64.into()]), Row::new(vec![4.into()])), - 3, - 1, - ), + (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */), + (kv(lit(2i64), lit("y")), 2 /* ts */, 1 /* diff */), + (kv(lit(3i64), lit("z")), 3 /* ts */, 1 /* diff */), ]; { let mut arr = arr.write(); - arr.apply_updates(now, updates.clone()).unwrap(); + arr.apply_updates(0, updates.clone()).unwrap(); // repeat the same updates means having multiple updates for the same key - arr.apply_updates(now, updates).unwrap(); + arr.apply_updates(0, updates).unwrap(); + assert_eq!( arr.get_updates_in_range(1..=1), vec![ - ((key.clone(), Row::new(vec![2.into()])), 1, 1), - ((key.clone(), Row::new(vec![2.into()])), 1, 1) + (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */), + (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */) ] ); assert_eq!(arr.spine.len(), 3); - arr.compaction_to(1).unwrap(); + arr.compact_to(1).unwrap(); assert_eq!(arr.spine.len(), 3); } + { let mut arr = arr.write(); assert_eq!(arr.spine.len(), 3); - assert_eq!(arr.get(10, &key), Some((Row::new(vec![2.into()]), 1, 2))); - arr.trunc_expired(10); + + arr.truncate_expired_keys(11); assert_eq!(arr.spine.len(), 3); - arr.trunc_expired(11); - assert_eq!(arr.get(11, &key), None); + let key = &lit(1i64); + assert_eq!(arr.get(11, key), Some((lit("x"), 1 /* ts */, 2 /* diff */))); + let key = &lit(2i64); + assert_eq!(arr.get(11, key), Some((lit("y"), 2 /* ts */, 2 /* diff */))); + let key = &lit(3i64); + assert_eq!(arr.get(11, key), Some((lit("z"), 3 /* ts */, 2 /* diff */))); + + arr.truncate_expired_keys(12); assert_eq!(arr.spine.len(), 3); + let key = &lit(1i64); + assert_eq!(arr.get(12, key), None); + let key = &lit(2i64); + assert_eq!(arr.get(12, key), Some((lit("y"), 2 /* ts */, 2 /* diff */))); + let key = &lit(3i64); + assert_eq!(arr.get(12, key), Some((lit("z"), 3 /* ts */, 2 /* diff */))); assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 2); - arr.trunc_expired(12); + + arr.truncate_expired_keys(13); assert_eq!(arr.spine.len(), 3); + let key = &lit(1i64); + assert_eq!(arr.get(13, key), None); + let key = &lit(2i64); + assert_eq!(arr.get(13, key), None); + let key = &lit(3i64); + assert_eq!(arr.get(13, key), Some((lit("z"), 3 /* ts */, 2 /* diff */))); assert_eq!(arr.expire_state.as_ref().unwrap().event_ts_to_key.len(), 1); } } @@ -668,86 +696,62 @@ mod test { #[test] fn test_apply_expired_keys() { // apply updates with a expired key - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); let expire_state = KeyExpiryManager { event_ts_to_key: Default::default(), key_expiration_duration: Some(10), event_timestamp_from_row: Some(ScalarExpr::Column(0)), }; - let expire_state = Some(expire_state); - arr.expire_state = expire_state; + arr.expire_state = Some(expire_state); let arr = ArrangeHandler::from(arr); let updates: Vec = vec![ - ( - (Row::new(vec![1i64.into()]), Row::new(vec![2.into()])), - 1, - 1, - ), - ( - (Row::new(vec![2i64.into()]), Row::new(vec![3.into()])), - 2, - 1, - ), - ( - (Row::new(vec![3i64.into()]), Row::new(vec![4.into()])), - 3, - 1, - ), - ( - (Row::new(vec![3i64.into()]), Row::new(vec![4.into()])), - 3, - 1, - ), - ( - (Row::new(vec![1i64.into()]), Row::new(vec![42.into()])), - 10, - 1, - ), + (kv(lit(1i64), lit("x")), 1 /* ts */, 1 /* diff */), + (kv(lit(2i64), lit("y")), 2 /* ts */, 1 /* diff */), ]; { let mut arr = arr.write(); - arr.apply_updates(11, updates).unwrap(); + let expired_by = arr.apply_updates(12, updates).unwrap(); + assert_eq!(expired_by, Some(1)); - assert_eq!( - arr.get(11, &Row::new(vec![1i64.into()])), - Some((Row::new(vec![42.into()]), 10, 1)) - ); - arr.trunc_expired(12); - assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None); + let key = &lit(1i64); + assert_eq!(arr.get(12, key), None); + let key = &lit(2i64); + assert_eq!(arr.get(12, key), Some((lit("y"), 2 /* ts */, 1 /* diff */))); } } - /// test if split_lte get ranges that are not aligned with batch boundaries - /// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches + /// test if split_spine_le get ranges that are not aligned with batch boundaries + /// this split_spine_le can correctly retrieve all updates in the range, including updates that are in the batches /// near the boundary of input range #[test] fn test_split_off() { - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); // manually create batch ..=1 and 2..=3 - arr.spine.insert(1, Default::default()); - arr.spine.insert(3, Default::default()); - arr.apply_updates( - 2, - vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)], - ) - .unwrap(); + arr.spine.insert(1, Batch::default()); + arr.spine.insert(3, Batch::default()); + + let updates = vec![(kv(lit("a"), lit("x")), 2 /* ts */, 1 /* diff */)]; // updates falls into the range of 2..=3 + arr.apply_updates(2, updates).unwrap(); + let mut arr1 = arr.clone(); { assert_eq!(arr.get_next_update_time(&1), Some(2)); - // split expect to take batch ..=1 and create a new batch 2..=2(which contain update) - let split = &arr.split_lte(&2); + // split expect to take batch ..=1 and create a new batch 2..=2 (which contains update) + let split = &arr.split_spine_le(&2); assert_eq!(split.len(), 2); assert_eq!(split[&2].len(), 1); - let _ = &arr.split_lte(&3); + assert_eq!(arr.get_next_update_time(&1), None); } + { // take all updates with timestamp <=1, will get no updates - let split = &arr1.split_lte(&1); + let split = &arr1.split_spine_le(&1); assert_eq!(split.len(), 1); + assert_eq!(split[&1].len(), 0); } } @@ -755,71 +759,97 @@ mod test { /// whether can get correct result #[test] fn test_get_by_range() { - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch // TODO(discord9): manually set batch let updates: Vec = vec![ - ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), - ((Row::new(vec![1i64.into()]), Row::empty()), 1, 1), - ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), - ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), - ((Row::new(vec![3i64.into()]), Row::empty()), 6, 1), - ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + (kv(lit("a"), lit("")), 2 /* ts */, 1 /* diff */), + (kv(lit("a"), lit("")), 1 /* ts */, 1 /* diff */), + (kv(lit("b"), lit("")), 4 /* ts */, 1 /* diff */), + (kv(lit("c"), lit("")), 3 /* ts */, 1 /* diff */), + (kv(lit("c"), lit("")), 6 /* ts */, 1 /* diff */), + (kv(lit("a"), lit("")), 5 /* ts */, 1 /* diff */), ]; arr.apply_updates(0, updates).unwrap(); assert_eq!( arr.get_updates_in_range(2..=5), vec![ - ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), - ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), - ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), - ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + (kv(lit("a"), lit("")), 2 /* ts */, 1 /* diff */), + (kv(lit("b"), lit("")), 4 /* ts */, 1 /* diff */), + (kv(lit("c"), lit("")), 3 /* ts */, 1 /* diff */), + (kv(lit("a"), lit("")), 5 /* ts */, 1 /* diff */), ] ); } /// test if get with range unaligned with batch boundary - /// can get correct result + /// can get correct result #[test] fn test_get_unaligned() { - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch // TODO(discord9): manually set batch - let key = Row::new(vec![1i64.into()]); + let key = &lit("a"); let updates: Vec = vec![ - ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), - ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), - ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), - ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), - ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), - ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + (kv(key, lit(1)), 2 /* ts */, 1 /* diff */), + (kv(key, lit(2)), 1 /* ts */, 1 /* diff */), + (kv(key, lit(3)), 4 /* ts */, 1 /* diff */), + (kv(key, lit(4)), 3 /* ts */, 1 /* diff */), + (kv(key, lit(5)), 6 /* ts */, 1 /* diff */), + (kv(key, lit(6)), 5 /* ts */, 1 /* diff */), ]; arr.apply_updates(0, updates).unwrap(); // aligned with batch boundary - assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1))); + assert_eq!(arr.get(2, key), Some((lit(1), 2 /* ts */, 1 /* diff */))); // unaligned with batch boundary - assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1))); + assert_eq!(arr.get(3, key), Some((lit(4), 3 /* ts */, 1 /* diff */))); } /// test if out of order updates can be sorted correctly #[test] fn test_out_of_order_apply_updates() { - let mut arr = Arrangement::new(); + let mut arr = Arrangement::default(); - let key = Row::new(vec![1i64.into()]); + let key = &lit("a"); let updates: Vec = vec![ - ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), - ((key.clone(), Row::new(vec![2i64.into()])), 2, -1), - ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), - ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), - ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), - ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), - ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + (kv(key, lit(5)), 6 /* ts */, 1 /* diff */), + (kv(key, lit(2)), 2 /* ts */, -1 /* diff */), + (kv(key, lit(1)), 2 /* ts */, 1 /* diff */), + (kv(key, lit(2)), 1 /* ts */, 1 /* diff */), + (kv(key, lit(3)), 4 /* ts */, 1 /* diff */), + (kv(key, lit(4)), 3 /* ts */, 1 /* diff */), + (kv(key, lit(6)), 5 /* ts */, 1 /* diff */), ]; arr.apply_updates(0, updates.clone()).unwrap(); - let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec(); + let sorted = updates + .iter() + .sorted_by_key(|(_, ts, _)| *ts) + .cloned() + .collect_vec(); assert_eq!(arr.get_updates_in_range(1..7), sorted); } + + #[test] + fn test_full_arrangement_get_from_first_entry() { + let mut arr = Arrangement::default(); + // will form {3: [1, 2, 3]} + let updates = vec![ + (kv(lit("a"), lit("x")), 3 /* ts */, 1 /* diff */), + (kv(lit("b"), lit("y")), 1 /* ts */, 1 /* diff */), + (kv(lit("b"), lit("y")), 2 /* ts */, -1 /* diff */), + ]; + arr.apply_updates(0, updates).unwrap(); + assert_eq!(arr.get(2, &lit("b")), None /* deleted */); + arr.full_arrangement = true; + assert_eq!(arr.get(2, &lit("b")), None /* still deleted */); + + arr.compact_to(1).unwrap(); + + assert_eq!( + arr.get(1, &lit("b")), + Some((lit("y"), 1, 1)) /* fast path */ + ); + } }