mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: divide subtasks from old/new partition rules (#7003)
* feat: divide subtasks from old/new partition rules Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix format Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * change copyright year Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * simplify filter Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * naming Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * Update src/partition/src/subtask.rs Co-authored-by: Zhenchi <zhongzc_arch@outlook.com> Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -22,7 +22,9 @@ pub mod error;
|
||||
pub mod expr;
|
||||
pub mod manager;
|
||||
pub mod multi_dim;
|
||||
pub mod overlap;
|
||||
pub mod partition;
|
||||
pub mod splitter;
|
||||
pub mod subtask;
|
||||
|
||||
pub use crate::partition::{PartitionRule, PartitionRuleRef};
|
||||
|
||||
334
src/partition/src/overlap.rs
Normal file
334
src/partition/src/overlap.rs
Normal file
@@ -0,0 +1,334 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Rule overlap and association utilities.
|
||||
//!
|
||||
//! This module provides pure functions to determine overlap relationships between
|
||||
//! partition expressions and to associate rule sets.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::ops::Bound;
|
||||
|
||||
use datatypes::value::OrderedF64;
|
||||
|
||||
use crate::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
|
||||
use crate::error::Result;
|
||||
use crate::expr::PartitionExpr;
|
||||
|
||||
/// Check if two atomic expressions can be both satisfied, i.e., whether they
|
||||
/// overlap on all common columns.
|
||||
pub fn atomic_exprs_overlap(lhs: &AtomicExpr, rhs: &AtomicExpr) -> bool {
|
||||
// Merge-walk over columns since nucleons are sorted by (column, op, value)
|
||||
let mut lhs_index = 0;
|
||||
let mut rhs_index = 0;
|
||||
|
||||
while lhs_index < lhs.nucleons.len() && rhs_index < rhs.nucleons.len() {
|
||||
let lhs_col = lhs.nucleons[lhs_index].column();
|
||||
let rhs_col = rhs.nucleons[rhs_index].column();
|
||||
|
||||
match lhs_col.cmp(rhs_col) {
|
||||
Ordering::Equal => {
|
||||
// advance to the next column boundaries in both atomics
|
||||
let mut lhs_next = lhs_index;
|
||||
let mut rhs_next = rhs_index;
|
||||
while lhs_next < lhs.nucleons.len() && lhs.nucleons[lhs_next].column() == lhs_col {
|
||||
lhs_next += 1;
|
||||
}
|
||||
while rhs_next < rhs.nucleons.len() && rhs.nucleons[rhs_next].column() == rhs_col {
|
||||
rhs_next += 1;
|
||||
}
|
||||
|
||||
let lhs_range = nucleons_to_range(&lhs.nucleons[lhs_index..lhs_next]);
|
||||
let rhs_range = nucleons_to_range(&rhs.nucleons[rhs_index..rhs_next]);
|
||||
|
||||
if !lhs_range.overlaps_with(&rhs_range) {
|
||||
return false;
|
||||
}
|
||||
|
||||
lhs_index = lhs_next;
|
||||
rhs_index = rhs_next;
|
||||
}
|
||||
Ordering::Less => {
|
||||
// column appears only in `lhs`, skip all nucleons for this column
|
||||
let col = lhs_col;
|
||||
while lhs_index < lhs.nucleons.len() && lhs.nucleons[lhs_index].column() == col {
|
||||
lhs_index += 1;
|
||||
}
|
||||
}
|
||||
Ordering::Greater => {
|
||||
// column appears only in `rhs`, skip all nucleons for this column
|
||||
let col = rhs_col;
|
||||
while rhs_index < rhs.nucleons.len() && rhs.nucleons[rhs_index].column() == col {
|
||||
rhs_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Pairwise overlap check between two expression lists.
|
||||
///
|
||||
/// Returns true if two [`PartitionExpr`]s are overlapping (any pair of atomics overlaps).
|
||||
fn expr_pair_overlap(lhs: &PartitionExpr, rhs: &PartitionExpr) -> Result<bool> {
|
||||
let binding = [lhs.clone(), rhs.clone()];
|
||||
let collider = Collider::new(&binding)?;
|
||||
// Split atomic exprs by source index
|
||||
let mut lhs_atoms = Vec::new();
|
||||
let mut rhs_atoms = Vec::new();
|
||||
for atomic in collider.atomic_exprs.iter() {
|
||||
if atomic.source_expr_index == 0 {
|
||||
lhs_atoms.push(atomic);
|
||||
} else {
|
||||
rhs_atoms.push(atomic);
|
||||
}
|
||||
}
|
||||
for lhs_atomic in &lhs_atoms {
|
||||
for rhs_atomic in &rhs_atoms {
|
||||
if atomic_exprs_overlap(lhs_atomic, rhs_atomic) {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
/// Associates each expression in `from_exprs` with indices of overlapping expressions in `to_exprs`.
|
||||
///
|
||||
/// Output vector length equals `from_exprs.len()`, and each inner vector contains indices into
|
||||
/// `to_exprs` that overlap with the corresponding `from_exprs[i]`.
|
||||
pub fn associate_from_to(
|
||||
from_exprs: &[PartitionExpr],
|
||||
to_exprs: &[PartitionExpr],
|
||||
) -> Result<Vec<Vec<usize>>> {
|
||||
let mut result = Vec::with_capacity(from_exprs.len());
|
||||
for from in from_exprs.iter() {
|
||||
let mut targets = Vec::new();
|
||||
for (i, to) in to_exprs.iter().enumerate() {
|
||||
if expr_pair_overlap(from, to)? {
|
||||
targets.push(i);
|
||||
}
|
||||
}
|
||||
result.push(targets);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Represents a value range derived from a group of nucleons for the same column
|
||||
#[derive(Debug, Clone)]
|
||||
struct ValueRange {
|
||||
lower: Bound<OrderedF64>,
|
||||
upper: Bound<OrderedF64>,
|
||||
}
|
||||
|
||||
impl ValueRange {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
lower: Bound::Unbounded,
|
||||
upper: Bound::Unbounded,
|
||||
}
|
||||
}
|
||||
|
||||
fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
|
||||
match (&self.lower, &new_lower) {
|
||||
(Bound::Unbounded, _) => self.lower = new_lower,
|
||||
(_, Bound::Unbounded) => {}
|
||||
(Bound::Included(cur), Bound::Included(new))
|
||||
| (Bound::Excluded(cur), Bound::Included(new))
|
||||
| (Bound::Included(cur), Bound::Excluded(new))
|
||||
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
|
||||
if new > cur {
|
||||
self.lower = new_lower;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
|
||||
match (&self.upper, &new_upper) {
|
||||
(Bound::Unbounded, _) => self.upper = new_upper,
|
||||
(_, Bound::Unbounded) => {}
|
||||
(Bound::Included(cur), Bound::Included(new))
|
||||
| (Bound::Excluded(cur), Bound::Included(new))
|
||||
| (Bound::Included(cur), Bound::Excluded(new))
|
||||
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
|
||||
if new < cur {
|
||||
self.upper = new_upper;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn overlaps_with(&self, other: &Self) -> bool {
|
||||
fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
|
||||
match (upper, lower) {
|
||||
(Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
|
||||
// u], [l
|
||||
(Bound::Included(u), Bound::Included(l)) => u < l,
|
||||
// u], (l) or u), [l or u), (l)
|
||||
(Bound::Included(u), Bound::Excluded(l))
|
||||
| (Bound::Excluded(u), Bound::Included(l))
|
||||
| (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
|
||||
}
|
||||
}
|
||||
|
||||
if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert nucleons for the same column into a ValueRange
|
||||
fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
|
||||
use GluonOp::*;
|
||||
|
||||
let mut range = ValueRange::new();
|
||||
for n in nucleons {
|
||||
let v = n.value();
|
||||
match n.op() {
|
||||
Eq => {
|
||||
range.lower = Bound::Included(v);
|
||||
range.upper = Bound::Included(v);
|
||||
break;
|
||||
}
|
||||
Lt => range.update_upper(Bound::Excluded(v)),
|
||||
LtEq => range.update_upper(Bound::Included(v)),
|
||||
Gt => range.update_lower(Bound::Excluded(v)),
|
||||
GtEq => range.update_lower(Bound::Included(v)),
|
||||
NotEq => continue, // handled elsewhere as separate atomics
|
||||
}
|
||||
}
|
||||
range
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::value::Value;
|
||||
|
||||
use super::*;
|
||||
use crate::expr::{Operand, PartitionExpr, RestrictedOp, col};
|
||||
|
||||
#[test]
|
||||
fn test_pair_overlap_simple() {
|
||||
let a = col("user_id")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("user_id").lt(Value::Int64(100)));
|
||||
let b = col("user_id").eq(Value::Int64(50));
|
||||
assert!(expr_pair_overlap(&a, &b).unwrap());
|
||||
|
||||
let c = col("user_id")
|
||||
.gt_eq(Value::Int64(100))
|
||||
.and(col("user_id").lt(Value::Int64(200)));
|
||||
assert!(!expr_pair_overlap(&a, &c).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_associate_from_to() {
|
||||
// from: [ [0,100), [100,200) ]
|
||||
let from = vec![
|
||||
col("user_id")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("user_id").lt(Value::Int64(100))),
|
||||
col("user_id")
|
||||
.gt_eq(Value::Int64(100))
|
||||
.and(col("user_id").lt(Value::Int64(200))),
|
||||
];
|
||||
// to: [ [0,150), [150,300) ]
|
||||
let to = vec![
|
||||
col("user_id")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("user_id").lt(Value::Int64(150))),
|
||||
col("user_id")
|
||||
.gt_eq(Value::Int64(150))
|
||||
.and(col("user_id").lt(Value::Int64(300))),
|
||||
];
|
||||
let assoc = associate_from_to(&from, &to).unwrap();
|
||||
assert_eq!(assoc.len(), 2);
|
||||
// [0,100) overlaps only with [0,150)
|
||||
assert_eq!(assoc[0], vec![0]);
|
||||
// [100,200) overlaps both [0,150) and [150,300)
|
||||
assert_eq!(assoc[1], vec![0, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expr_with_or() {
|
||||
// a: (user_id = 10 OR user_id = 20)
|
||||
let a = PartitionExpr::new(
|
||||
Operand::Expr(col("user_id").eq(Value::Int64(10))),
|
||||
RestrictedOp::Or,
|
||||
Operand::Expr(col("user_id").eq(Value::Int64(20))),
|
||||
);
|
||||
let b = col("user_id")
|
||||
.gt_eq(Value::Int64(15))
|
||||
.and(col("user_id").lt_eq(Value::Int64(25)));
|
||||
assert!(expr_pair_overlap(&a, &b).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_adjacent_ranges_non_overlap() {
|
||||
// [0, 100) vs [100, 200) -> no overlap
|
||||
let from = vec![
|
||||
col("k")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("k").lt(Value::Int64(100))),
|
||||
];
|
||||
let to = vec![
|
||||
col("k")
|
||||
.gt_eq(Value::Int64(100))
|
||||
.and(col("k").lt(Value::Int64(200))),
|
||||
];
|
||||
let assoc = associate_from_to(&from, &to).unwrap();
|
||||
assert_eq!(assoc[0], Vec::<usize>::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_column_conflict_no_overlap() {
|
||||
// Left: a in [0,10) AND b >= 5
|
||||
let left = col("a")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("a").lt(Value::Int64(10)))
|
||||
.and(col("b").gt_eq(Value::Int64(5)));
|
||||
// Right: a = 9 AND b < 5 -> conflict on b
|
||||
let right = col("a")
|
||||
.eq(Value::Int64(9))
|
||||
.and(col("b").lt(Value::Int64(5)));
|
||||
assert!(!expr_pair_overlap(&left, &right).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_disjoint_columns_overlap() {
|
||||
// Different columns don't constrain each other => satisfiable together
|
||||
let from = vec![col("a").eq(Value::Int64(1))];
|
||||
let to = vec![col("b").eq(Value::Int64(2))];
|
||||
let assoc = associate_from_to(&from, &to).unwrap();
|
||||
assert_eq!(assoc[0], vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_boundary_inclusive_exclusive() {
|
||||
// Left: a <= 10 AND a >= 10 => a = 10
|
||||
let left = col("a")
|
||||
.lt_eq(Value::Int64(10))
|
||||
.and(col("a").gt_eq(Value::Int64(10)));
|
||||
// Right: a = 10 -> overlap
|
||||
let right_eq = col("a").eq(Value::Int64(10));
|
||||
assert!(expr_pair_overlap(&left, &right_eq).unwrap());
|
||||
|
||||
// Left: a < 10, Right: a = 10 -> no overlap
|
||||
let left_lt = col("a").lt(Value::Int64(10));
|
||||
assert!(!expr_pair_overlap(&left_lt, &right_eq).unwrap());
|
||||
}
|
||||
}
|
||||
306
src/partition/src/subtask.rs
Normal file
306
src/partition/src/subtask.rs
Normal file
@@ -0,0 +1,306 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::expr::PartitionExpr;
|
||||
use crate::overlap::associate_from_to;
|
||||
|
||||
/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RepartitionSubtask {
|
||||
pub from_expr_indices: Vec<usize>,
|
||||
pub to_expr_indices: Vec<usize>,
|
||||
/// For each `from_expr_indices[k]`, the corresponding vector contains global
|
||||
/// `to_expr_indices` that overlap with it (indices into the original `to_exprs`).
|
||||
pub transition_map: Vec<Vec<usize>>,
|
||||
}
|
||||
|
||||
/// Create independent subtasks out of given FROM/TO partition expressions.
|
||||
pub fn create_subtasks(
|
||||
from_exprs: &[PartitionExpr],
|
||||
to_exprs: &[PartitionExpr],
|
||||
) -> Result<Vec<RepartitionSubtask>> {
|
||||
// FROM -> TO
|
||||
let assoc = associate_from_to(from_exprs, to_exprs)?;
|
||||
if !assoc.iter().any(|v| !v.is_empty()) {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// TO -> FROM
|
||||
let mut rev = vec![Vec::new(); to_exprs.len()];
|
||||
for (li, rights) in assoc.iter().enumerate() {
|
||||
for &r in rights {
|
||||
rev[r].push(li);
|
||||
}
|
||||
}
|
||||
|
||||
// FROM(left), TO(right). Undirected
|
||||
let mut visited_left = vec![false; from_exprs.len()];
|
||||
let mut visited_right = vec![false; to_exprs.len()];
|
||||
let mut subtasks = Vec::new();
|
||||
|
||||
for li in 0..from_exprs.len() {
|
||||
if assoc[li].is_empty() || visited_left[li] {
|
||||
continue;
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
enum Node {
|
||||
Left(usize),
|
||||
Right(usize),
|
||||
}
|
||||
let mut left_set = Vec::new();
|
||||
let mut right_set = Vec::new();
|
||||
let mut queue = VecDeque::new();
|
||||
|
||||
visited_left[li] = true;
|
||||
queue.push_back(Node::Left(li));
|
||||
|
||||
while let Some(node) = queue.pop_front() {
|
||||
match node {
|
||||
Node::Left(left) => {
|
||||
left_set.push(left);
|
||||
for &r in &assoc[left] {
|
||||
if !visited_right[r] {
|
||||
visited_right[r] = true;
|
||||
queue.push_back(Node::Right(r));
|
||||
}
|
||||
}
|
||||
}
|
||||
Node::Right(right) => {
|
||||
right_set.push(right);
|
||||
for &l in &rev[right] {
|
||||
if !visited_left[l] {
|
||||
visited_left[l] = true;
|
||||
queue.push_back(Node::Left(l));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
left_set.sort_unstable();
|
||||
right_set.sort_unstable();
|
||||
|
||||
let transition_map = left_set
|
||||
.iter()
|
||||
.map(|&i| assoc[i].clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
subtasks.push(RepartitionSubtask {
|
||||
from_expr_indices: left_set,
|
||||
to_expr_indices: right_set,
|
||||
transition_map,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(subtasks)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::value::Value;
|
||||
|
||||
use super::*;
|
||||
use crate::expr::col;
|
||||
#[test]
|
||||
fn test_split_one_to_two() {
|
||||
// Left: [0, 40)
|
||||
let from = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(20))),
|
||||
];
|
||||
|
||||
// Right: [0, 10), [10, 20)
|
||||
let to = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(10))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(10))
|
||||
.and(col("u").lt(Value::Int64(20))),
|
||||
];
|
||||
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
assert_eq!(subtasks.len(), 1);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
|
||||
assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_two_to_one() {
|
||||
// Left: [0, 10), [10, 20)
|
||||
let from = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(10))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(10))
|
||||
.and(col("u").lt(Value::Int64(20))),
|
||||
];
|
||||
// Right: [0, 40)
|
||||
let to = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(20))),
|
||||
];
|
||||
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
assert_eq!(subtasks.len(), 1);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].transition_map[0], vec![0]);
|
||||
assert_eq!(subtasks[0].transition_map[1], vec![0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_subtasks_disconnected() {
|
||||
// Left: A:[0,10), B:[20,30)
|
||||
let from = vec![
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("x").lt(Value::Int64(10))),
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(20))
|
||||
.and(col("x").lt(Value::Int64(30))),
|
||||
];
|
||||
// Right: C:[5,15), D:[40,50)
|
||||
let to = vec![
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(5))
|
||||
.and(col("x").lt(Value::Int64(15))),
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(40))
|
||||
.and(col("x").lt(Value::Int64(50))),
|
||||
];
|
||||
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
|
||||
// Expect two components: {A,C} and {B} has no edges so filtered out
|
||||
// Note: nodes with no edges are excluded by construction
|
||||
assert_eq!(subtasks.len(), 1);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_subtasks_multi() {
|
||||
// Left: [0,100), [100,200)
|
||||
let from = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(100))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(100))
|
||||
.and(col("u").lt(Value::Int64(200))),
|
||||
];
|
||||
// Right: [0,50), [50,150), [150,250)
|
||||
let to = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(50))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(50))
|
||||
.and(col("u").lt(Value::Int64(150))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(150))
|
||||
.and(col("u").lt(Value::Int64(250))),
|
||||
];
|
||||
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
// All connected into a single component
|
||||
assert_eq!(subtasks.len(), 1);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0, 1, 2]);
|
||||
// [0,100) -> [0,50), [50,150)
|
||||
// [100,200) -> [50,150), [150,250)
|
||||
assert_eq!(subtasks[0].transition_map[0], vec![0, 1]);
|
||||
assert_eq!(subtasks[0].transition_map[1], vec![1, 2]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_two_components() {
|
||||
// Left: A:[0,10), B:[20,30)
|
||||
let from = vec![
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("x").lt(Value::Int64(10))),
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(20))
|
||||
.and(col("x").lt(Value::Int64(30))),
|
||||
];
|
||||
// Right: C:[5,7), D:[22,28)
|
||||
let to = vec![
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(5))
|
||||
.and(col("x").lt(Value::Int64(7))),
|
||||
col("x")
|
||||
.gt_eq(Value::Int64(22))
|
||||
.and(col("x").lt(Value::Int64(28))),
|
||||
];
|
||||
let mut subtasks = create_subtasks(&from, &to).unwrap();
|
||||
// Deterministic order: left indices sorted, so components may appear in order of discovery.
|
||||
assert_eq!(subtasks.len(), 2);
|
||||
// Sort for stable assertion by smallest left index
|
||||
subtasks.sort_by_key(|s| s.from_expr_indices[0]);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0]);
|
||||
assert_eq!(subtasks[0].transition_map, vec![vec![0]]);
|
||||
assert_eq!(subtasks[1].from_expr_indices, vec![1]);
|
||||
assert_eq!(subtasks[1].to_expr_indices, vec![1]);
|
||||
assert_eq!(subtasks[1].transition_map, vec![vec![1]]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bridge_single_component() {
|
||||
// Left: [0,10), [10,20)
|
||||
let from = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(0))
|
||||
.and(col("u").lt(Value::Int64(10))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(10))
|
||||
.and(col("u").lt(Value::Int64(20))),
|
||||
];
|
||||
// Right: [5,15), [15,25)
|
||||
let to = vec![
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(5))
|
||||
.and(col("u").lt(Value::Int64(15))),
|
||||
col("u")
|
||||
.gt_eq(Value::Int64(15))
|
||||
.and(col("u").lt(Value::Int64(25))),
|
||||
];
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
assert_eq!(subtasks.len(), 1);
|
||||
assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]);
|
||||
assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]);
|
||||
assert_eq!(subtasks[0].transition_map[0], vec![0]);
|
||||
assert_eq!(subtasks[0].transition_map[1], vec![0, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_isolated_no_subtasks() {
|
||||
// No edges at all
|
||||
let from = vec![col("k").lt(Value::Int64(10))];
|
||||
let to = vec![col("k").gt_eq(Value::Int64(10))];
|
||||
let subtasks = create_subtasks(&from, &to).unwrap();
|
||||
assert!(subtasks.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -14,17 +14,14 @@
|
||||
|
||||
//! [`ConstraintPruner`] prunes partition info based on given expressions.
|
||||
|
||||
use std::cmp::Ordering;
|
||||
use std::ops::Bound;
|
||||
|
||||
use GluonOp::*;
|
||||
use ahash::{HashMap, HashSet};
|
||||
use common_telemetry::debug;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::{OrderedF64, OrderedFloat, Value};
|
||||
use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
|
||||
use datatypes::value::{OrderedFloat, Value};
|
||||
use partition::collider::{AtomicExpr, Collider};
|
||||
use partition::expr::{Operand, PartitionExpr};
|
||||
use partition::manager::PartitionInfo;
|
||||
use partition::overlap::atomic_exprs_overlap;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -109,13 +106,9 @@ impl ConstraintPruner {
|
||||
}
|
||||
|
||||
fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool {
|
||||
for query_atomic in query_atomics {
|
||||
if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
query_atomics
|
||||
.iter()
|
||||
.any(|qa| atomic_exprs_overlap(qa, partition_atomic))
|
||||
}
|
||||
|
||||
fn normalize_datatype(
|
||||
@@ -213,196 +206,8 @@ impl ConstraintPruner {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a single atomic constraint can be satisfied
|
||||
fn atomic_constraint_satisfied(
|
||||
query_atomic: &AtomicExpr,
|
||||
partition_atomic: &AtomicExpr,
|
||||
) -> bool {
|
||||
let mut query_index = 0;
|
||||
let mut partition_index = 0;
|
||||
|
||||
while query_index < query_atomic.nucleons.len()
|
||||
&& partition_index < partition_atomic.nucleons.len()
|
||||
{
|
||||
let query_col = query_atomic.nucleons[query_index].column();
|
||||
let partition_col = partition_atomic.nucleons[partition_index].column();
|
||||
|
||||
match query_col.cmp(partition_col) {
|
||||
Ordering::Equal => {
|
||||
let mut query_index_for_next_col = query_index;
|
||||
let mut partition_index_for_next_col = partition_index;
|
||||
|
||||
while query_index_for_next_col < query_atomic.nucleons.len()
|
||||
&& query_atomic.nucleons[query_index_for_next_col].column() == query_col
|
||||
{
|
||||
query_index_for_next_col += 1;
|
||||
}
|
||||
while partition_index_for_next_col < partition_atomic.nucleons.len()
|
||||
&& partition_atomic.nucleons[partition_index_for_next_col].column()
|
||||
== partition_col
|
||||
{
|
||||
partition_index_for_next_col += 1;
|
||||
}
|
||||
|
||||
let query_range = Self::nucleons_to_range(
|
||||
&query_atomic.nucleons[query_index..query_index_for_next_col],
|
||||
);
|
||||
let partition_range = Self::nucleons_to_range(
|
||||
&partition_atomic.nucleons[partition_index..partition_index_for_next_col],
|
||||
);
|
||||
|
||||
debug!("Comparing two ranges, {query_range:?} and {partition_range:?}");
|
||||
|
||||
query_index = query_index_for_next_col;
|
||||
partition_index = partition_index_for_next_col;
|
||||
|
||||
if !query_range.overlaps_with(&partition_range) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Ordering::Less => {
|
||||
// Query column comes before partition column - skip query column
|
||||
while query_index < query_atomic.nucleons.len()
|
||||
&& query_atomic.nucleons[query_index].column() == query_col
|
||||
{
|
||||
query_index += 1;
|
||||
}
|
||||
}
|
||||
Ordering::Greater => {
|
||||
// Partition column comes before query column - skip partition column
|
||||
while partition_index < partition_atomic.nucleons.len()
|
||||
&& partition_atomic.nucleons[partition_index].column() == partition_col
|
||||
{
|
||||
partition_index += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Convert a slice of nucleons (all for the same column) into a ValueRange
|
||||
fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
|
||||
let mut range = ValueRange::new();
|
||||
|
||||
for nucleon in nucleons {
|
||||
let value = nucleon.value();
|
||||
match nucleon.op() {
|
||||
Eq => {
|
||||
range.lower = Bound::Included(value);
|
||||
range.upper = Bound::Included(value);
|
||||
break; // exact value, most restrictive
|
||||
}
|
||||
Lt => {
|
||||
// upper < value
|
||||
range.update_upper(Bound::Excluded(value));
|
||||
}
|
||||
LtEq => {
|
||||
range.update_upper(Bound::Included(value));
|
||||
}
|
||||
Gt => {
|
||||
range.update_lower(Bound::Excluded(value));
|
||||
}
|
||||
GtEq => {
|
||||
range.update_lower(Bound::Included(value));
|
||||
}
|
||||
NotEq => {
|
||||
// handled as two separate atomic exprs elsewhere
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
range
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a value range derived from a group of nucleons for the same column
|
||||
#[derive(Debug, Clone)]
|
||||
struct ValueRange {
|
||||
// lower and upper bounds using standard library Bound semantics
|
||||
lower: Bound<OrderedF64>,
|
||||
upper: Bound<OrderedF64>,
|
||||
}
|
||||
|
||||
impl ValueRange {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
lower: Bound::Unbounded,
|
||||
upper: Bound::Unbounded,
|
||||
}
|
||||
}
|
||||
|
||||
// Update lower bound choosing the more restrictive one
|
||||
fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
|
||||
match (&self.lower, &new_lower) {
|
||||
(Bound::Unbounded, _) => self.lower = new_lower,
|
||||
(_, Bound::Unbounded) => { /* keep existing */ }
|
||||
(Bound::Included(cur), Bound::Included(new))
|
||||
| (Bound::Excluded(cur), Bound::Included(new))
|
||||
| (Bound::Included(cur), Bound::Excluded(new))
|
||||
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
|
||||
if new > cur {
|
||||
self.lower = new_lower;
|
||||
} else if new == cur {
|
||||
// prefer Excluded over Included for the same value (more restrictive)
|
||||
if matches!(new_lower, Bound::Excluded(_))
|
||||
&& matches!(self.lower, Bound::Included(_))
|
||||
{
|
||||
self.lower = new_lower;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update upper bound choosing the more restrictive one
|
||||
fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
|
||||
match (&self.upper, &new_upper) {
|
||||
(Bound::Unbounded, _) => self.upper = new_upper,
|
||||
(_, Bound::Unbounded) => { /* keep existing */ }
|
||||
(Bound::Included(cur), Bound::Included(new))
|
||||
| (Bound::Excluded(cur), Bound::Included(new))
|
||||
| (Bound::Included(cur), Bound::Excluded(new))
|
||||
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
|
||||
if new < cur {
|
||||
self.upper = new_upper;
|
||||
} else if new == cur {
|
||||
// prefer Excluded over Included for the same value (more restrictive)
|
||||
if matches!(new_upper, Bound::Excluded(_))
|
||||
&& matches!(self.upper, Bound::Included(_))
|
||||
{
|
||||
self.upper = new_upper;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this range overlaps with another range
|
||||
fn overlaps_with(&self, other: &ValueRange) -> bool {
|
||||
fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
|
||||
match (upper, lower) {
|
||||
(Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
|
||||
// u], [l
|
||||
(Bound::Included(u), Bound::Included(l)) => u < l,
|
||||
// u], (l
|
||||
(Bound::Included(u), Bound::Excluded(l))
|
||||
// u), [l
|
||||
| (Bound::Excluded(u), Bound::Included(l))
|
||||
// u), (l
|
||||
| (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
|
||||
}
|
||||
}
|
||||
|
||||
if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
// Value range and atomic overlap logic is now refactored into `partition::diff`.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
Reference in New Issue
Block a user