refactor: remove useless partition legacy code (#5786)

* refactor: remove useless partition legacy code

* also remove error variants

* fix imports
This commit is contained in:
Lei, HUANG
2025-03-27 19:08:25 +08:00
committed by GitHub
parent 9b7b012620
commit 09ef24fd75
9 changed files with 3 additions and 1028 deletions

View File

@@ -15,27 +15,18 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use common_meta::cache::{new_table_route_cache, TableRouteCacheRef};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use datafusion_expr::expr::Expr;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::{lit, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use meta_client::client::MetaClient;
use moka::future::CacheBuilder;
use partition::columns::RangeColumnsPartitionRule;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::partition::{PartitionBound, PartitionDef};
use partition::range::RangePartitionRule;
use partition::PartitionRuleRef;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
@@ -207,168 +198,3 @@ pub(crate) async fn create_partition_rule_manager(
partition_manager
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang, weny): WIP new partition rule"]
async fn test_find_partition_rule() {
let partition_manager =
create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await;
// "one_column_partitioning_table" has id 1
let partition_rule = partition_manager
.find_table_partition_rule(1)
.await
.unwrap();
let range_rule = partition_rule
.as_any()
.downcast_ref::<RangePartitionRule>()
.unwrap();
assert_eq!(range_rule.column_name(), "a");
assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]);
assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]);
// "two_column_partitioning_table" has table 2
let partition_rule = partition_manager
.find_table_partition_rule(2)
.await
.unwrap();
let range_columns_rule = partition_rule
.as_any()
.downcast_ref::<RangeColumnsPartitionRule>()
.unwrap();
assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]);
assert_eq!(
range_columns_rule.value_lists(),
&vec![
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue]
]
);
assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let kv_backend = Arc::new(MetaKvBackend {
client: Arc::new(MetaClient::default()),
});
let table_route_cache = test_new_table_route_cache(kv_backend.clone());
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache));
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),
// PARTITION r2 VALUES LESS THAN (20),
// PARTITION r3 VALUES LESS THAN (50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE),
// )
let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new(
"a",
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
vec![0_u32, 1, 2, 3],
)) as _;
let partition_rule_clone = partition_rule.clone();
let test = |filters: Vec<Expr>, expect_regions: Vec<RegionNumber>| {
let mut regions = partition_manager
.find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice())
.unwrap();
regions.sort();
assert_eq!(regions, expect_regions);
};
// test simple filter
test(
vec![binary_expr(col("a"), Operator::Lt, lit(10))], // a < 10
vec![0],
);
test(
vec![binary_expr(col("a"), Operator::LtEq, lit(10))], // a <= 10
vec![0, 1],
);
test(
vec![binary_expr(lit(20), Operator::Gt, col("a"))], // 20 > a
vec![0, 1],
);
test(
vec![binary_expr(lit(20), Operator::GtEq, col("a"))], // 20 >= a
vec![0, 1, 2],
);
test(
vec![binary_expr(lit(45), Operator::Eq, col("a"))], // 45 == a
vec![2],
);
test(
vec![binary_expr(col("a"), Operator::NotEq, lit(45))], // a != 45
vec![0, 1, 2, 3],
);
test(
vec![binary_expr(col("a"), Operator::Gt, lit(50))], // a > 50
vec![3],
);
// test multiple filters
test(
vec![
binary_expr(col("a"), Operator::Gt, lit(10)),
binary_expr(col("a"), Operator::Gt, lit(50)),
], // [a > 10, a > 50]
vec![3],
);
// test finding all regions when provided with not supported filters or not partition column
test(
vec![binary_expr(col("row_id"), Operator::LtEq, lit(123))], // row_id <= 123
vec![0, 1, 2, 3],
);
test(
vec![binary_expr(col("c"), Operator::Gt, lit(123))], // c > 789
vec![0, 1, 2, 3],
);
// test complex "AND" or "OR" filters
test(
vec![and(
binary_expr(col("row_id"), Operator::Lt, lit(1)),
or(
binary_expr(col("row_id"), Operator::Lt, lit(1)),
binary_expr(col("a"), Operator::Lt, lit(1)),
),
)], // row_id < 1 OR (row_id < 1 AND a > 1)
vec![0, 1, 2, 3],
);
test(
vec![or(
binary_expr(col("a"), Operator::Lt, lit(20)),
binary_expr(col("a"), Operator::GtEq, lit(20)),
)], // a < 20 OR a >= 20
vec![0, 1, 2, 3],
);
test(
vec![and(
binary_expr(col("a"), Operator::Lt, lit(20)),
binary_expr(col("a"), Operator::Lt, lit(50)),
)], // a < 20 AND a < 50
vec![0, 1],
);
// test failed to find regions by contradictory filters
let regions = partition_manager.find_regions_by_filters(
partition_rule,
vec![and(
binary_expr(col("a"), Operator::Lt, lit(20)),
binary_expr(col("a"), Operator::GtEq, lit(20)),
)]
.as_slice(),
); // a < 20 AND a >= 20
assert!(matches!(
regions.unwrap_err(),
partition::error::Error::FindRegions { .. }
));
}

View File

@@ -1,416 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use datafusion_expr::Operator;
use datatypes::value::Value;
use snafu::ensure;
use store_api::storage::RegionNumber;
use crate::error::{self, Result};
use crate::partition::{PartitionBound, PartitionExpr, PartitionRule};
/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule](crate::range::RangePartitionRule) except that it allows
/// partitioning by multiple columns.
///
/// This rule is generated from create table request, using MySQL's syntax:
///
/// ```SQL
/// CREATE TABLE table_name (
/// columns definition
/// )
/// PARTITION BY RANGE COLUMNS(column_list) (
/// PARTITION region_name VALUES LESS THAN (value_list)[,
/// PARTITION region_name VALUES LESS THAN (value_list)][,
/// ...]
/// )
///
/// column_list:
/// column_name[, column_name][, ...]
///
/// value_list:
/// value[, value][, ...]
/// ```
///
/// Please refer to MySQL's ["RANGE COLUMNS Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html)
/// document for more details.
pub struct RangeColumnsPartitionRule {
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionNumber>,
// TODO(LFC): Implement finding regions by all partitioning columns, not by the first one only.
// Singled out the first partitioning column's bounds for finding regions by range.
//
// Theoretically, finding regions in `value_list`s should use all the partition columns values
// as a whole in the comparison (think of how Rust's vector is compared to each other). And
// this is how we do it if provided with concrete values (see `find_region` method).
//
// However, when we need to find regions by range, for example, a filter of "x < 100" defined
// in SQL, currently I'm not quite sure how that could be implemented. Especially facing the complex
// filter expression like "a < 1 AND (b > 2 OR c != 3)".
//
// So I decided to use the first partitioning column temporarily in finding regions by range,
// and further investigate how MySQL (and others) implemented this feature in detail.
//
// Finding regions only using the first partitioning column is fine. It might return regions that
// actually do not contain the range's value (causing unnecessary table scans), but will
// not lose any data that should have been scanned.
//
// The following two fields are acted as caches, so we don't need to recalculate them every time.
first_column_bounds: Vec<PartitionBound>,
first_column_regions: Vec<Vec<RegionNumber>>,
}
impl RangeColumnsPartitionRule {
// It's assured that input arguments are valid because they are checked in SQL parsing stage.
// So we can skip validating them.
pub fn new(
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionNumber>,
) -> Self {
// An example range columns partition rule to calculate the first column bounds and regions:
// SQL:
// PARTITION p1 VALUES LESS THAN (10, 'c'),
// PARTITION p2 VALUES LESS THAN (20, 'h'),
// PARTITION p3 VALUES LESS THAN (20, 'm'),
// PARTITION p4 VALUES LESS THAN (50, 'p'),
// PARTITION p5 VALUES LESS THAN (MAXVALUE, 'x'),
// PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE),
// first column bounds:
// [10, 20, 50, MAXVALUE]
// first column regions:
// [[1], [2, 3], [4], [5, 6]]
let first_column_bounds = value_lists
.iter()
.map(|x| &x[0])
.collect::<Vec<&PartitionBound>>();
let mut distinct_bounds = Vec::<PartitionBound>::new();
distinct_bounds.push(first_column_bounds[0].clone());
let mut first_column_regions = Vec::<Vec<RegionNumber>>::new();
first_column_regions.push(vec![regions[0]]);
for i in 1..first_column_bounds.len() {
if first_column_bounds[i] == &distinct_bounds[distinct_bounds.len() - 1] {
first_column_regions[distinct_bounds.len() - 1].push(regions[i]);
} else {
distinct_bounds.push(first_column_bounds[i].clone());
first_column_regions.push(vec![regions[i]]);
}
}
Self {
column_list,
value_lists,
regions,
first_column_bounds: distinct_bounds,
first_column_regions,
}
}
pub fn column_list(&self) -> &Vec<String> {
&self.column_list
}
pub fn value_lists(&self) -> &Vec<Vec<PartitionBound>> {
&self.value_lists
}
pub fn regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
}
impl PartitionRule for RangeColumnsPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
self.column_list.clone()
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
ensure!(
values.len() == self.column_list.len(),
error::RegionKeysSizeSnafu {
expect: self.column_list.len(),
actual: values.len(),
}
);
// How tuple is compared:
// (a, b) < (x, y) <= (a < x) || ((a == x) && (b < y))
let values = values
.iter()
.map(|v| PartitionBound::Value(v.clone()))
.collect::<Vec<PartitionBound>>();
Ok(match self.value_lists.binary_search(&values) {
Ok(i) => self.regions[i + 1],
Err(i) => self.regions[i],
})
}
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
let regions =
if !exprs.is_empty() && exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
op,
value,
} = exprs
.iter()
.find(|x| x.column == self.column_list[0])
// "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs"
.unwrap();
let regions = &self.first_column_regions;
match self
.first_column_bounds
.binary_search(&PartitionBound::Value(value.clone()))
{
Ok(i) => match op {
Operator::Lt => &regions[..=i],
Operator::LtEq => &regions[..=(i + 1)],
Operator::Eq => &regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &regions[(i + 1)..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
Err(i) => match op {
Operator::Lt | Operator::LtEq => &regions[..=i],
Operator::Eq => &regions[i..=i],
Operator::Gt | Operator::GtEq => &regions[i..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
}
.iter()
.flatten()
.cloned()
.collect::<Vec<RegionNumber>>()
} else {
self.regions.clone()
};
Ok(regions)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::partition::{PartitionBound, PartitionExpr};
#[test]
fn test_find_regions() {
// PARTITION BY RANGE COLUMNS(a, b)
// PARTITION p1 VALUES LESS THAN ('hz', 10),
// PARTITION p2 VALUES LESS THAN ('sh', 20),
// PARTITION p3 VALUES LESS THAN ('sh', 50),
// PARTITION p4 VALUES LESS THAN ('sz', 100),
// PARTITION p5 VALUES LESS THAN (MAXVALUE, 200),
// PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE),
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string(), "b".to_string()],
vec![
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(10_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(20_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(50_i32.into()),
],
vec![
PartitionBound::Value("sz".into()),
PartitionBound::Value(100_i32.into()),
],
vec![
PartitionBound::MaxValue,
PartitionBound::Value(200_i32.into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
],
vec![1, 2, 3, 4, 5, 6],
);
let test = |op: Operator, value: &str, expected_regions: Vec<RegionNumber>| {
let exprs = vec![
// Intentionally fix column b's partition expr to "b < 1". If we support finding
// regions by both columns("a" and "b") in the future, some test cases should fail.
PartitionExpr {
column: "b".to_string(),
op: Operator::Lt,
value: 1_i32.into(),
},
PartitionExpr {
column: "a".to_string(),
op,
value: value.into(),
},
];
let regions = rule.find_regions_by_exprs(&exprs).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionNumber>>()
);
};
test(Operator::NotEq, "hz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::NotEq, "what", vec![1, 2, 3, 4, 5, 6]);
test(Operator::GtEq, "ab", vec![1, 2, 3, 4, 5, 6]);
test(Operator::GtEq, "hz", vec![2, 3, 4, 5, 6]);
test(Operator::GtEq, "ijk", vec![2, 3, 4, 5, 6]);
test(Operator::GtEq, "sh", vec![4, 5, 6]);
test(Operator::GtEq, "ssh", vec![4, 5, 6]);
test(Operator::GtEq, "sz", vec![5, 6]);
test(Operator::GtEq, "zz", vec![5, 6]);
test(Operator::Gt, "ab", vec![1, 2, 3, 4, 5, 6]);
test(Operator::Gt, "hz", vec![2, 3, 4, 5, 6]);
test(Operator::Gt, "ijk", vec![2, 3, 4, 5, 6]);
test(Operator::Gt, "sh", vec![4, 5, 6]);
test(Operator::Gt, "ssh", vec![4, 5, 6]);
test(Operator::Gt, "sz", vec![5, 6]);
test(Operator::Gt, "zz", vec![5, 6]);
test(Operator::Eq, "ab", vec![1]);
test(Operator::Eq, "hz", vec![2, 3]);
test(Operator::Eq, "ijk", vec![2, 3]);
test(Operator::Eq, "sh", vec![4]);
test(Operator::Eq, "ssh", vec![4]);
test(Operator::Eq, "sz", vec![5, 6]);
test(Operator::Eq, "zz", vec![5, 6]);
test(Operator::Lt, "ab", vec![1]);
test(Operator::Lt, "hz", vec![1]);
test(Operator::Lt, "ijk", vec![1, 2, 3]);
test(Operator::Lt, "sh", vec![1, 2, 3]);
test(Operator::Lt, "ssh", vec![1, 2, 3, 4]);
test(Operator::Lt, "sz", vec![1, 2, 3, 4]);
test(Operator::Lt, "zz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::LtEq, "ab", vec![1]);
test(Operator::LtEq, "hz", vec![1, 2, 3]);
test(Operator::LtEq, "ijk", vec![1, 2, 3]);
test(Operator::LtEq, "sh", vec![1, 2, 3, 4]);
test(Operator::LtEq, "ssh", vec![1, 2, 3, 4]);
test(Operator::LtEq, "sz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::LtEq, "zz", vec![1, 2, 3, 4, 5, 6]);
// If trying to find regions that is not partitioning column, return all regions.
let exprs = vec![
PartitionExpr {
column: "c".to_string(),
op: Operator::Lt,
value: 1_i32.into(),
},
PartitionExpr {
column: "a".to_string(),
op: Operator::Lt,
value: "hz".into(),
},
];
let regions = rule.find_regions_by_exprs(&exprs).unwrap();
assert_eq!(regions, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_find_region() {
// PARTITION BY RANGE COLUMNS(a) (
// PARTITION r1 VALUES LESS THAN ('hz'),
// PARTITION r2 VALUES LESS THAN ('sh'),
// PARTITION r3 VALUES LESS THAN (MAXVALUE),
// )
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string()],
vec![
vec![PartitionBound::Value("hz".into())],
vec![PartitionBound::Value("sh".into())],
vec![PartitionBound::MaxValue],
],
vec![1, 2, 3],
);
assert_matches!(
rule.find_region(&["foo".into(), 1000_i32.into()]),
Err(error::Error::RegionKeysSize {
expect: 1,
actual: 2,
..
})
);
assert_matches!(rule.find_region(&["foo".into()]), Ok(1));
assert_matches!(rule.find_region(&["bar".into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into()]), Ok(2));
assert_matches!(rule.find_region(&["hzz".into()]), Ok(2));
assert_matches!(rule.find_region(&["sh".into()]), Ok(3));
assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3));
// PARTITION BY RANGE COLUMNS(a, b) (
// PARTITION r1 VALUES LESS THAN ('hz', 10),
// PARTITION r2 VALUES LESS THAN ('hz', 20),
// PARTITION r3 VALUES LESS THAN ('sh', 50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE, MAXVALUE),
// )
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string(), "b".to_string()],
vec![
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(10_i32.into()),
],
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(20_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(50_i32.into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
],
vec![1, 2, 3, 4],
);
assert_matches!(
rule.find_region(&["foo".into()]),
Err(error::Error::RegionKeysSize {
expect: 2,
actual: 1,
..
})
);
assert_matches!(rule.find_region(&["foo".into(), 1_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["bar".into(), 11_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into(), 2_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into(), 12_i32.into()]), Ok(2));
assert_matches!(rule.find_region(&["hz".into(), 22_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hz".into(), 999_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hzz".into(), 1_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hzz".into(), 999_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["sh".into(), 49_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["sh".into(), 50_i32.into()]), Ok(4));
assert_matches!(rule.find_region(&["zzz".into(), 1_i32.into()]), Ok(4));
}
}

View File

@@ -18,7 +18,6 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::Expr;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
@@ -93,20 +92,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find region, reason: {}", reason))]
FindRegion {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to find regions by filters: {:?}", filters))]
FindRegions {
filters: Vec<Expr>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest {
reason: String,
@@ -201,9 +186,7 @@ impl ErrorExt for Error {
| Error::InvalidExpr { .. }
| Error::UndefinedColumn { .. } => StatusCode::InvalidArguments,
Error::FindRegion { .. }
| Error::FindRegions { .. }
| Error::RegionKeysSize { .. }
Error::RegionKeysSize { .. }
| Error::InvalidInsertRequest { .. }
| Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments,

View File

@@ -16,13 +16,11 @@
//! Structs and traits for partitioning rule.
pub mod columns;
pub mod error;
pub mod expr;
pub mod manager;
pub mod multi_dim;
pub mod partition;
pub mod range;
pub mod splitter;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::Rows;
@@ -21,17 +21,13 @@ use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::{self, RegionRoute};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::prelude::Value;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::multi_dim::MultiDimPartitionRule;
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::partition::{PartitionBound, PartitionDef};
use crate::splitter::RowSplitter;
use crate::{error, PartitionRuleRef};
@@ -175,88 +171,6 @@ impl PartitionRuleManager {
Ok(Arc::new(rule) as _)
}
/// Get partition rule of given table.
pub async fn find_table_partition_rule_deprecated(
&self,
table_id: TableId,
) -> Result<PartitionRuleRef> {
let partitions = self.find_table_partitions(table_id).await?;
debug_assert!(!partitions.is_empty());
let partition_columns = partitions[0].partition.partition_columns();
let regions = partitions
.iter()
.map(|x| x.id.region_number())
.collect::<Vec<RegionNumber>>();
// TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way.
let partition_rule: PartitionRuleRef = match partition_columns.len() {
1 => {
// Omit the last "MAXVALUE".
let bounds = partitions
.iter()
.filter_map(|info| match &info.partition.partition_bounds()[0] {
PartitionBound::Value(v) => Some(v.clone()),
PartitionBound::MaxValue => None,
PartitionBound::Expr(_) => None,
})
.collect::<Vec<Value>>();
Arc::new(RangePartitionRule::new(
partition_columns[0].clone(),
bounds,
regions,
)) as _
}
_ => {
let bounds = partitions
.iter()
.map(|x| x.partition.partition_bounds().clone())
.collect::<Vec<Vec<PartitionBound>>>();
Arc::new(RangeColumnsPartitionRule::new(
partition_columns.clone(),
bounds,
regions,
)) as _
}
};
Ok(partition_rule)
}
/// Find regions in partition rule by filters.
pub fn find_regions_by_filters(
&self,
partition_rule: PartitionRuleRef,
filters: &[Expr],
) -> Result<Vec<RegionNumber>> {
let regions = if let Some((first, rest)) = filters.split_first() {
let mut target = find_regions0(partition_rule.clone(), first)?;
for filter in rest {
let regions = find_regions0(partition_rule.clone(), filter)?;
// When all filters are provided as a collection, it often implicitly states that
// "all filters must be satisfied". So we join all the results here.
target.retain(|x| regions.contains(x));
// Failed fast, empty collection join any is empty.
if target.is_empty() {
break;
}
}
target.into_iter().collect::<Vec<_>>()
} else {
partition_rule.find_regions_by_exprs(&[])?
};
ensure!(
!regions.is_empty(),
error::FindRegionsSnafu {
filters: filters.to_vec()
}
);
Ok(regions)
}
pub async fn find_region_leader(&self, region_id: RegionId) -> Result<Peer> {
let region_routes = &self
.find_physical_table_route(region_id.table_id())
@@ -324,56 +238,3 @@ fn create_partitions_from_region_routes(
Ok(partitions)
}
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {
match filter {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.supports_propagation() => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(Expr::Column(c), Expr::Literal(v)) => Some((&c.name, *op, v)),
(Expr::Literal(v), Expr::Column(c)) => Some((
&c.name,
// Safety: previous branch ensures this is a comparison operator
op.swap().unwrap(),
v,
)),
_ => None,
};
if let Some((column, op, scalar)) = column_op_value {
let value = Value::try_from(scalar.clone()).with_context(|_| {
error::ConvertScalarValueSnafu {
value: scalar.clone(),
}
})?;
return Ok(partition_rule
.find_regions_by_exprs(&[PartitionExpr::new(column, op, value)])?
.into_iter()
.collect::<HashSet<RegionNumber>>());
}
}
Expr::BinaryExpr(BinaryExpr { left, op, right })
if matches!(op, Operator::And | Operator::Or) =>
{
let left_regions = find_regions0(partition_rule.clone(), &left.clone())?;
let right_regions = find_regions0(partition_rule.clone(), &right.clone())?;
let regions = match op {
Operator::And => left_regions
.intersection(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
Operator::Or => left_regions
.union(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
_ => unreachable!(),
};
return Ok(regions);
}
_ => (),
}
// Returns all regions for not supported partition expr as a safety hatch.
Ok(partition_rule
.find_regions_by_exprs(&[])?
.into_iter()
.collect::<HashSet<RegionNumber>>())
}

View File

@@ -148,13 +148,6 @@ impl PartitionRule for MultiDimPartitionRule {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
self.find_region(values)
}
fn find_regions_by_exprs(
&self,
_exprs: &[crate::partition::PartitionExpr],
) -> Result<Vec<RegionNumber>> {
Ok(self.regions.clone())
}
}
/// Helper for [RuleChecker]

View File

@@ -37,11 +37,6 @@ pub trait PartitionRule: Sync + Send {
///
/// Note that the `values` should have the same length as the `partition_columns`.
fn find_region(&self, values: &[Value]) -> Result<RegionNumber>;
/// Finds the target regions by the partition expressions.
///
/// Note that the `exprs` should have the same length as the `partition_columns`.
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>>;
}
/// The right bound(exclusive) of partition range.

View File

@@ -1,252 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use datafusion_expr::Operator;
use datatypes::prelude::*;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
use crate::partition::{PartitionExpr, PartitionRule};
/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value
/// range. It's generated from create table request, using MySQL's syntax:
///
/// ```SQL
/// CREATE TABLE table_name (
/// columns definition
/// )
/// PARTITION BY RANGE (column_name) (
/// PARTITION partition_name VALUES LESS THAN (value)[,
/// PARTITION partition_name VALUES LESS THAN (value)][,
/// ...]
/// )
/// ```
///
/// Please refer to MySQL's ["RANGE Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html)
/// document for more details.
///
/// Some partition related validations like:
/// - the column used in partitioning must be defined in the create table request
/// - partition name must be unique
/// - range bounds(the "value"s) must be strictly increased
/// - the last partition range must be bounded by "MAXVALUE"
///
/// are all been done in the create table SQL parsing stage. So we can safely skip some checks on the
/// input arguments.
///
/// # Important Notes on Partition and Region
///
/// Technically, table "partition" is a concept of data sharding logically, i.e., how table's data are
/// distributed in logic. And "region" is of how data been placed physically. They should be used
/// in different ways.
///
/// However, currently we have only one region for each partition. For the sake of simplicity, the
/// terms "partition" and "region" are used interchangeably.
///
// TODO(LFC): Further clarify "partition" and "region".
// Could be creating an extra layer between partition and region.
#[derive(Debug, Serialize, Deserialize)]
pub struct RangePartitionRule {
column_name: String,
// Does not store the last "MAXVALUE" bound; because in this way our binary search in finding
// partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`).
// Then the length of `bounds` is one less than `regions`.
bounds: Vec<Value>,
regions: Vec<RegionNumber>,
}
impl RangePartitionRule {
pub fn new(
column_name: impl Into<String>,
bounds: Vec<Value>,
regions: Vec<RegionNumber>,
) -> Self {
Self {
column_name: column_name.into(),
bounds,
regions,
}
}
pub fn column_name(&self) -> &String {
&self.column_name
}
pub fn all_regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
pub fn bounds(&self) -> &Vec<Value> {
&self.bounds
}
}
impl PartitionRule for RangePartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
vec![self.column_name().to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
debug_assert_eq!(
values.len(),
1,
"RangePartitionRule can only handle one partition value, actual {}",
values.len()
);
let value = &values[0];
Ok(match self.bounds.binary_search(value) {
Ok(i) => self.regions[i + 1],
Err(i) => self.regions[i],
})
}
fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
if exprs.is_empty() {
return Ok(self.regions.clone());
}
debug_assert_eq!(
exprs.len(),
1,
"RangePartitionRule can only handle one partition expr, actual {}",
exprs.len()
);
let PartitionExpr { column, op, value } =
exprs.first().context(error::FindRegionSnafu {
reason: "no partition expr is provided",
})?;
let regions = if column == self.column_name() {
// an example of bounds and regions:
// SQL:
// PARTITION p1 VALUES LESS THAN (10),
// PARTITION p2 VALUES LESS THAN (20),
// PARTITION p3 VALUES LESS THAN (50),
// PARTITION p4 VALUES LESS THAN (MAXVALUE),
// bounds: [10, 20, 50]
// regions: [1, 2, 3, 4]
match self.bounds.binary_search(value) {
Ok(i) => match op {
Operator::Lt => &self.regions[..=i],
Operator::LtEq => &self.regions[..=(i + 1)],
Operator::Eq => &self.regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &self.regions[(i + 1)..],
Operator::NotEq => &self.regions[..],
_ => unimplemented!(),
},
Err(i) => match op {
Operator::Lt | Operator::LtEq => &self.regions[..=i],
Operator::Eq => &self.regions[i..=i],
Operator::Gt | Operator::GtEq => &self.regions[i..],
Operator::NotEq => &self.regions[..],
_ => unimplemented!(),
},
}
.to_vec()
} else {
self.all_regions().clone()
};
Ok(regions)
}
}
#[cfg(test)]
mod test {
use datafusion_expr::Operator;
use super::*;
use crate::partition::PartitionExpr;
#[test]
fn test_find_regions() {
// PARTITION BY RANGE (a) (
// PARTITION p1 VALUES LESS THAN ('hz'),
// PARTITION p2 VALUES LESS THAN ('sh'),
// PARTITION p3 VALUES LESS THAN ('sz'),
// PARTITION p4 VALUES LESS THAN (MAXVALUE),
// )
let rule = RangePartitionRule {
column_name: "a".to_string(),
bounds: vec!["hz".into(), "sh".into(), "sz".into()],
regions: vec![1, 2, 3, 4],
};
let test =
|column: &str, op: Operator, value: &str, expected_regions: Vec<RegionNumber>| {
let expr = PartitionExpr {
column: column.to_string(),
op,
value: value.into(),
};
let regions = rule.find_regions_by_exprs(&[expr]).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionNumber>>()
);
};
test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]);
test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]);
test("a", Operator::GtEq, "ab", vec![1, 2, 3, 4]);
test("a", Operator::GtEq, "hz", vec![2, 3, 4]);
test("a", Operator::GtEq, "ijk", vec![2, 3, 4]);
test("a", Operator::GtEq, "sh", vec![3, 4]);
test("a", Operator::GtEq, "ssh", vec![3, 4]);
test("a", Operator::GtEq, "sz", vec![4]);
test("a", Operator::GtEq, "zz", vec![4]);
test("a", Operator::Gt, "ab", vec![1, 2, 3, 4]);
test("a", Operator::Gt, "hz", vec![2, 3, 4]);
test("a", Operator::Gt, "ijk", vec![2, 3, 4]);
test("a", Operator::Gt, "sh", vec![3, 4]);
test("a", Operator::Gt, "ssh", vec![3, 4]);
test("a", Operator::Gt, "sz", vec![4]);
test("a", Operator::Gt, "zz", vec![4]);
test("a", Operator::Eq, "ab", vec![1]);
test("a", Operator::Eq, "hz", vec![2]);
test("a", Operator::Eq, "ijk", vec![2]);
test("a", Operator::Eq, "sh", vec![3]);
test("a", Operator::Eq, "ssh", vec![3]);
test("a", Operator::Eq, "sz", vec![4]);
test("a", Operator::Eq, "zz", vec![4]);
test("a", Operator::Lt, "ab", vec![1]);
test("a", Operator::Lt, "hz", vec![1]);
test("a", Operator::Lt, "ijk", vec![1, 2]);
test("a", Operator::Lt, "sh", vec![1, 2]);
test("a", Operator::Lt, "ssh", vec![1, 2, 3]);
test("a", Operator::Lt, "sz", vec![1, 2, 3]);
test("a", Operator::Lt, "zz", vec![1, 2, 3, 4]);
test("a", Operator::LtEq, "ab", vec![1]);
test("a", Operator::LtEq, "hz", vec![1, 2]);
test("a", Operator::LtEq, "ijk", vec![1, 2]);
test("a", Operator::LtEq, "sh", vec![1, 2, 3]);
test("a", Operator::LtEq, "ssh", vec![1, 2, 3]);
test("a", Operator::LtEq, "sz", vec![1, 2, 3, 4]);
test("a", Operator::LtEq, "zz", vec![1, 2, 3, 4]);
test("b", Operator::Lt, "1", vec![1, 2, 3, 4]);
}
}

View File

@@ -139,7 +139,6 @@ mod tests {
use serde::{Deserialize, Serialize};
use super::*;
use crate::partition::PartitionExpr;
use crate::PartitionRule;
fn mock_rows() -> Rows {
@@ -210,10 +209,6 @@ mod tests {
Ok(val.parse::<u32>().unwrap() % 2)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -237,10 +232,6 @@ mod tests {
Ok(val)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -258,10 +249,6 @@ mod tests {
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber> {
Ok(0)
}
fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result<Vec<RegionNumber>> {
unimplemented!()
}
}
#[test]