mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix(flow): Arrange get range with batch unaligned (#3552)
* fix: Arrange get range with batch unaligned * chore: per review * refactor: sort at apply_updates
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::Itertools;
|
||||
@@ -24,11 +25,12 @@ use crate::expr::error::InternalSnafu;
|
||||
use crate::expr::{EvalError, ScalarExpr};
|
||||
use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp};
|
||||
|
||||
pub type Batch = BTreeMap<Row, SmallVec<[DiffRow; 2]>>;
|
||||
pub type Spine = BTreeMap<Timestamp, Batch>;
|
||||
|
||||
/// Determine when should a key expire according to it's event timestamp in key,
|
||||
/// if a key is expired, any future updates to it should be ignored
|
||||
/// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp)
|
||||
///
|
||||
/// TODO(discord9): find a better way to handle key expiration, like write to disk or something instead of throw away
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
|
||||
pub struct KeyExpiryManager {
|
||||
/// a map from event timestamp to key, used for expire keys
|
||||
@@ -121,13 +123,16 @@ pub struct Arrangement {
|
||||
/// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1`
|
||||
///
|
||||
/// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense
|
||||
spine: BTreeMap<Timestamp, BTreeMap<Row, SmallVec<[DiffRow; 2]>>>,
|
||||
/// TODO: batch size balancing?
|
||||
spine: Spine,
|
||||
/// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce`
|
||||
full_arrangement: bool,
|
||||
/// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared
|
||||
is_written: bool,
|
||||
/// manage the expire state of the arrangement
|
||||
expire_state: Option<KeyExpiryManager>,
|
||||
/// the time that the last compaction happened, also know as current time
|
||||
last_compaction_time: Option<Timestamp>,
|
||||
}
|
||||
|
||||
impl Arrangement {
|
||||
@@ -137,6 +142,7 @@ impl Arrangement {
|
||||
full_arrangement: false,
|
||||
is_written: false,
|
||||
expire_state: None,
|
||||
last_compaction_time: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,6 +166,7 @@ impl Arrangement {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// the first batch with key that's greater or equal to ts
|
||||
let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() {
|
||||
batch
|
||||
@@ -171,20 +178,87 @@ impl Arrangement {
|
||||
{
|
||||
let key_updates = batch.entry(key).or_insert(smallvec![]);
|
||||
key_updates.push((val, ts, diff));
|
||||
// a stable sort make updates sort in order of insertion
|
||||
// without changing the order of updates within same tick
|
||||
key_updates.sort_by_key(|r| r.1);
|
||||
}
|
||||
}
|
||||
Ok(max_late_by)
|
||||
}
|
||||
|
||||
/// find out the time of next update in the future
|
||||
/// that is the next update with `timestamp > now`
|
||||
pub fn get_next_update_time(&self, now: &Timestamp) -> Option<Timestamp> {
|
||||
// iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch
|
||||
let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded));
|
||||
for (_ts, batch) in next_batches {
|
||||
let min_ts = batch
|
||||
.iter()
|
||||
.flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts))
|
||||
.min();
|
||||
if let Some(min_ts) = min_ts {
|
||||
return Some(min_ts);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
// all batches are empty, return now
|
||||
None
|
||||
}
|
||||
|
||||
/// get the last compaction time
|
||||
pub fn get_compaction(&self) -> Option<Timestamp> {
|
||||
self.last_compaction_time
|
||||
}
|
||||
|
||||
/// split spine off at `now`, and return the spine that's before `now`(including `now`)
|
||||
fn split_lte(&mut self, now: &Timestamp) -> Spine {
|
||||
let mut before = self.spine.split_off(&(now + 1));
|
||||
std::mem::swap(&mut before, &mut self.spine);
|
||||
|
||||
// if before's last key == now, then all the keys we needed are found
|
||||
if before
|
||||
.last_key_value()
|
||||
.map(|(k, _v)| *k == *now)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return before;
|
||||
}
|
||||
|
||||
// also need to move all keys from the first batch in spine with timestamp<=now to before
|
||||
// we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch
|
||||
if let Some(mut first_batch) = self.spine.first_entry() {
|
||||
let mut new_batch: Batch = Default::default();
|
||||
// remove all keys with val of empty vec
|
||||
first_batch.get_mut().retain(|key, updates| {
|
||||
// remove keys <= now from updates
|
||||
updates.retain(|(val, ts, diff)| {
|
||||
if *ts <= *now {
|
||||
new_batch.entry(key.clone()).or_insert(smallvec![]).push((
|
||||
val.clone(),
|
||||
*ts,
|
||||
*diff,
|
||||
));
|
||||
}
|
||||
*ts > *now
|
||||
});
|
||||
!updates.is_empty()
|
||||
});
|
||||
|
||||
before.entry(*now).or_default().extend(new_batch);
|
||||
}
|
||||
before
|
||||
}
|
||||
|
||||
/// advance time to `now` and consolidate all older(`now` included) updates to the first key
|
||||
///
|
||||
/// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired
|
||||
pub fn set_compaction(&mut self, now: Timestamp) -> Result<Option<Duration>, EvalError> {
|
||||
let mut max_late_by: Option<Duration> = None;
|
||||
|
||||
let mut should_compact = self.spine.split_off(&(now + 1));
|
||||
std::mem::swap(&mut should_compact, &mut self.spine);
|
||||
let should_compact = self.split_lte(&now);
|
||||
|
||||
self.last_compaction_time = Some(now);
|
||||
// if a full arrangement is not needed, we can just discard everything before and including now
|
||||
if !self.full_arrangement {
|
||||
return Ok(None);
|
||||
@@ -221,18 +295,62 @@ impl Arrangement {
|
||||
}
|
||||
|
||||
/// get the updates of the arrangement from the given range of time
|
||||
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp>>(
|
||||
pub fn get_updates_in_range<R: std::ops::RangeBounds<Timestamp> + Clone>(
|
||||
&self,
|
||||
range: R,
|
||||
) -> Vec<KeyValDiffRow> {
|
||||
let mut result = vec![];
|
||||
for (_ts, batch) in self.spine.range(range) {
|
||||
for (key, updates) in batch.clone() {
|
||||
for (val, ts, diff) in updates {
|
||||
result.push(((key.clone(), val), ts, diff));
|
||||
// three part:
|
||||
// 1.the starting batch with first key >= range.start, which may contain updates that not in range
|
||||
// 2. the batches with key in range
|
||||
// 3. the last batch with first key > range.end, which may contain updates that are in range
|
||||
let mut is_first = true;
|
||||
for (_ts, batch) in self.spine.range(range.clone()) {
|
||||
if is_first {
|
||||
for (key, updates) in batch {
|
||||
let iter = updates
|
||||
.iter()
|
||||
.filter(|(_val, ts, _diff)| range.contains(ts))
|
||||
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
|
||||
result.extend(iter);
|
||||
}
|
||||
is_first = false;
|
||||
} else {
|
||||
for (key, updates) in batch.clone() {
|
||||
result.extend(
|
||||
updates
|
||||
.iter()
|
||||
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deal with boundary include start and end
|
||||
// and for the next batch with upper_bound >= range.end
|
||||
// we need to search for updates within range
|
||||
let neg_bound = match range.end_bound() {
|
||||
Bound::Included(b) => {
|
||||
// if boundary is aligned, the last batch in range actually cover the full range
|
||||
// then there will be no further keys we need in the next batch
|
||||
if self.spine.contains_key(b) {
|
||||
return result;
|
||||
}
|
||||
Bound::Excluded(*b)
|
||||
}
|
||||
Bound::Excluded(b) => Bound::Included(*b),
|
||||
Bound::Unbounded => return result,
|
||||
};
|
||||
let search_range = (neg_bound, Bound::Unbounded);
|
||||
if let Some(last_batch) = self.spine.range(search_range).next() {
|
||||
for (key, updates) in last_batch.1 {
|
||||
let iter = updates
|
||||
.iter()
|
||||
.filter(|(_val, ts, _diff)| range.contains(ts))
|
||||
.map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff));
|
||||
result.extend(iter);
|
||||
}
|
||||
};
|
||||
result
|
||||
}
|
||||
|
||||
@@ -260,11 +378,12 @@ impl Arrangement {
|
||||
/// get current state of things
|
||||
/// useful for query existing keys(i.e. reduce and join operator need to query existing state)
|
||||
pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> {
|
||||
if self
|
||||
.spine
|
||||
.first_key_value()
|
||||
.map(|(ts, _)| *ts >= now)
|
||||
.unwrap_or(false)
|
||||
if self.full_arrangement
|
||||
&& self
|
||||
.spine
|
||||
.first_key_value()
|
||||
.map(|(ts, _)| *ts >= now)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
self.spine
|
||||
.first_key_value()
|
||||
@@ -272,10 +391,38 @@ impl Arrangement {
|
||||
} else {
|
||||
// check keys <= now to know current value
|
||||
let mut final_val = None;
|
||||
for (_ts, batch) in self.spine.range(..=now) {
|
||||
|
||||
let with_extra_batch = {
|
||||
let unaligned = self.spine.range(..=now);
|
||||
if unaligned
|
||||
.clone()
|
||||
.last()
|
||||
.map(|(ts, _)| *ts == now)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// this extra chain is there just to make type the same
|
||||
unaligned.chain(None)
|
||||
} else {
|
||||
// if the last key is not equal to now, then we need to include the next batch
|
||||
// because we know last batch key < now < next batch key
|
||||
// therefore next batch may contain updates that we want
|
||||
unaligned.chain(
|
||||
self.spine
|
||||
.range((Bound::Excluded(now), Bound::Unbounded))
|
||||
.next(),
|
||||
)
|
||||
}
|
||||
};
|
||||
for (ts, batch) in with_extra_batch {
|
||||
if let Some(new_rows) = batch.get(key).map(|v| v.iter()) {
|
||||
for new_row in new_rows {
|
||||
final_val = compact_diff_row(final_val, new_row);
|
||||
if *ts <= now {
|
||||
for new_row in new_rows {
|
||||
final_val = compact_diff_row(final_val, new_row);
|
||||
}
|
||||
} else {
|
||||
for new_row in new_rows.filter(|new_row| new_row.1 <= now) {
|
||||
final_val = compact_diff_row(final_val, new_row);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -530,4 +677,108 @@ mod test {
|
||||
assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None);
|
||||
}
|
||||
}
|
||||
|
||||
/// test if split_lte get ranges that are not aligned with batch boundaries
|
||||
/// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches
|
||||
/// near the boundary of input range
|
||||
#[test]
|
||||
fn test_split_off() {
|
||||
let mut arr = Arrangement::new();
|
||||
// manually create batch ..=1 and 2..=3
|
||||
arr.spine.insert(1, Default::default());
|
||||
arr.spine.insert(3, Default::default());
|
||||
arr.apply_updates(
|
||||
2,
|
||||
vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)],
|
||||
)
|
||||
.unwrap();
|
||||
// updates falls into the range of 2..=3
|
||||
let mut arr1 = arr.clone();
|
||||
{
|
||||
assert_eq!(arr.get_next_update_time(&1), Some(2));
|
||||
// split expect to take batch ..=1 and create a new batch 2..=2(which contain update)
|
||||
let split = &arr.split_lte(&2);
|
||||
assert_eq!(split.len(), 2);
|
||||
assert_eq!(split[&2].len(), 1);
|
||||
let _ = &arr.split_lte(&3);
|
||||
assert_eq!(arr.get_next_update_time(&1), None);
|
||||
}
|
||||
{
|
||||
// take all updates with timestamp <=1, will get no updates
|
||||
let split = &arr1.split_lte(&1);
|
||||
assert_eq!(split.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
/// test if get ranges is not aligned with boundary of batch,
|
||||
/// whether can get correct result
|
||||
#[test]
|
||||
fn test_get_by_range() {
|
||||
let mut arr = Arrangement::new();
|
||||
|
||||
// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
|
||||
// TODO(discord9): manually set batch
|
||||
let updates: Vec<KeyValDiffRow> = vec![
|
||||
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
|
||||
((Row::new(vec![1i64.into()]), Row::empty()), 1, 1),
|
||||
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
|
||||
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
|
||||
((Row::new(vec![3i64.into()]), Row::empty()), 6, 1),
|
||||
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
|
||||
];
|
||||
arr.apply_updates(0, updates).unwrap();
|
||||
assert_eq!(
|
||||
arr.get_updates_in_range(2..=5),
|
||||
vec![
|
||||
((Row::new(vec![1i64.into()]), Row::empty()), 2, 1),
|
||||
((Row::new(vec![2i64.into()]), Row::empty()), 4, 1),
|
||||
((Row::new(vec![3i64.into()]), Row::empty()), 3, 1),
|
||||
((Row::new(vec![1i64.into()]), Row::empty()), 5, 1),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// test if get with range unaligned with batch boundary
|
||||
/// can get correct result
|
||||
#[test]
|
||||
fn test_get_unaligned() {
|
||||
let mut arr = Arrangement::new();
|
||||
|
||||
// will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch
|
||||
// TODO(discord9): manually set batch
|
||||
let key = Row::new(vec![1i64.into()]);
|
||||
let updates: Vec<KeyValDiffRow> = vec![
|
||||
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
|
||||
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
|
||||
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
|
||||
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
|
||||
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
|
||||
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
|
||||
];
|
||||
arr.apply_updates(0, updates).unwrap();
|
||||
// aligned with batch boundary
|
||||
assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1)));
|
||||
// unaligned with batch boundary
|
||||
assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1)));
|
||||
}
|
||||
|
||||
/// test if out of order updates can be sorted correctly
|
||||
#[test]
|
||||
fn test_out_of_order_apply_updates() {
|
||||
let mut arr = Arrangement::new();
|
||||
|
||||
let key = Row::new(vec![1i64.into()]);
|
||||
let updates: Vec<KeyValDiffRow> = vec![
|
||||
((key.clone(), Row::new(vec![5i64.into()])), 6, 1),
|
||||
((key.clone(), Row::new(vec![2i64.into()])), 2, -1),
|
||||
((key.clone(), Row::new(vec![1i64.into()])), 2, 1),
|
||||
((key.clone(), Row::new(vec![2i64.into()])), 1, 1),
|
||||
((key.clone(), Row::new(vec![3i64.into()])), 4, 1),
|
||||
((key.clone(), Row::new(vec![4i64.into()])), 3, 1),
|
||||
((key.clone(), Row::new(vec![6i64.into()])), 5, 1),
|
||||
];
|
||||
arr.apply_updates(0, updates.clone()).unwrap();
|
||||
let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec();
|
||||
assert_eq!(arr.get_updates_in_range(1..7), sorted);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user