From 09ef24fd7540a4f9f35fe6a9094afa65419a3ea7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Thu, 27 Mar 2025 19:08:25 +0800 Subject: [PATCH] refactor: remove useless partition legacy code (#5786) * refactor: remove useless partition legacy code * also remove error variants * fix imports --- src/operator/src/tests/partition_manager.rs | 174 -------- src/partition/src/columns.rs | 416 -------------------- src/partition/src/error.rs | 19 +- src/partition/src/lib.rs | 2 - src/partition/src/manager.rs | 143 +------ src/partition/src/multi_dim.rs | 7 - src/partition/src/partition.rs | 5 - src/partition/src/range.rs | 252 ------------ src/partition/src/splitter.rs | 13 - 9 files changed, 3 insertions(+), 1028 deletions(-) delete mode 100644 src/partition/src/columns.rs delete mode 100644 src/partition/src/range.rs diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 622b8e104a..4e74e1b128 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -15,27 +15,18 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use catalog::kvbackend::MetaKvBackend; use common_meta::cache::{new_table_route_cache, TableRouteCacheRef}; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManager; -use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; -use datafusion_expr::expr::Expr; -use datafusion_expr::expr_fn::{and, binary_expr, col, or}; -use datafusion_expr::{lit, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; -use meta_client::client::MetaClient; use moka::future::CacheBuilder; -use partition::columns::RangeColumnsPartitionRule; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use partition::partition::{PartitionBound, PartitionDef}; -use partition::range::RangePartitionRule; -use partition::PartitionRuleRef; use store_api::storage::RegionNumber; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; @@ -207,168 +198,3 @@ pub(crate) async fn create_partition_rule_manager( partition_manager } - -#[tokio::test(flavor = "multi_thread")] -#[ignore = "TODO(ruihang, weny): WIP new partition rule"] -async fn test_find_partition_rule() { - let partition_manager = - create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await; - - // "one_column_partitioning_table" has id 1 - let partition_rule = partition_manager - .find_table_partition_rule(1) - .await - .unwrap(); - let range_rule = partition_rule - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(range_rule.column_name(), "a"); - assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); - assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); - - // "two_column_partitioning_table" has table 2 - let partition_rule = partition_manager - .find_table_partition_rule(2) - .await - .unwrap(); - let range_columns_rule = partition_rule - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(range_columns_rule.column_list(), &vec!["a", "b"]); - assert_eq!( - range_columns_rule.value_lists(), - &vec![ - vec![ - PartitionBound::Value(10_i32.into()), - PartitionBound::Value("hz".into()), - ], - vec![ - PartitionBound::Value(50_i32.into()), - PartitionBound::Value("sh".into()), - ], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue] - ] - ); - assert_eq!(range_columns_rule.regions(), &vec![1, 2, 3]); -} - -#[tokio::test(flavor = "multi_thread")] -async fn test_find_regions() { - let kv_backend = Arc::new(MetaKvBackend { - client: Arc::new(MetaClient::default()), - }); - let table_route_cache = test_new_table_route_cache(kv_backend.clone()); - let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend, table_route_cache)); - - // PARTITION BY RANGE (a) ( - // PARTITION r1 VALUES LESS THAN (10), - // PARTITION r2 VALUES LESS THAN (20), - // PARTITION r3 VALUES LESS THAN (50), - // PARTITION r4 VALUES LESS THAN (MAXVALUE), - // ) - let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( - "a", - vec![10_i32.into(), 20_i32.into(), 50_i32.into()], - vec![0_u32, 1, 2, 3], - )) as _; - - let partition_rule_clone = partition_rule.clone(); - let test = |filters: Vec, expect_regions: Vec| { - let mut regions = partition_manager - .find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice()) - .unwrap(); - regions.sort(); - assert_eq!(regions, expect_regions); - }; - - // test simple filter - test( - vec![binary_expr(col("a"), Operator::Lt, lit(10))], // a < 10 - vec![0], - ); - test( - vec![binary_expr(col("a"), Operator::LtEq, lit(10))], // a <= 10 - vec![0, 1], - ); - test( - vec![binary_expr(lit(20), Operator::Gt, col("a"))], // 20 > a - vec![0, 1], - ); - test( - vec![binary_expr(lit(20), Operator::GtEq, col("a"))], // 20 >= a - vec![0, 1, 2], - ); - test( - vec![binary_expr(lit(45), Operator::Eq, col("a"))], // 45 == a - vec![2], - ); - test( - vec![binary_expr(col("a"), Operator::NotEq, lit(45))], // a != 45 - vec![0, 1, 2, 3], - ); - test( - vec![binary_expr(col("a"), Operator::Gt, lit(50))], // a > 50 - vec![3], - ); - - // test multiple filters - test( - vec![ - binary_expr(col("a"), Operator::Gt, lit(10)), - binary_expr(col("a"), Operator::Gt, lit(50)), - ], // [a > 10, a > 50] - vec![3], - ); - - // test finding all regions when provided with not supported filters or not partition column - test( - vec![binary_expr(col("row_id"), Operator::LtEq, lit(123))], // row_id <= 123 - vec![0, 1, 2, 3], - ); - test( - vec![binary_expr(col("c"), Operator::Gt, lit(123))], // c > 789 - vec![0, 1, 2, 3], - ); - - // test complex "AND" or "OR" filters - test( - vec![and( - binary_expr(col("row_id"), Operator::Lt, lit(1)), - or( - binary_expr(col("row_id"), Operator::Lt, lit(1)), - binary_expr(col("a"), Operator::Lt, lit(1)), - ), - )], // row_id < 1 OR (row_id < 1 AND a > 1) - vec![0, 1, 2, 3], - ); - test( - vec![or( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::GtEq, lit(20)), - )], // a < 20 OR a >= 20 - vec![0, 1, 2, 3], - ); - test( - vec![and( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::Lt, lit(50)), - )], // a < 20 AND a < 50 - vec![0, 1], - ); - - // test failed to find regions by contradictory filters - let regions = partition_manager.find_regions_by_filters( - partition_rule, - vec![and( - binary_expr(col("a"), Operator::Lt, lit(20)), - binary_expr(col("a"), Operator::GtEq, lit(20)), - )] - .as_slice(), - ); // a < 20 AND a >= 20 - assert!(matches!( - regions.unwrap_err(), - partition::error::Error::FindRegions { .. } - )); -} diff --git a/src/partition/src/columns.rs b/src/partition/src/columns.rs deleted file mode 100644 index bb1572a60d..0000000000 --- a/src/partition/src/columns.rs +++ /dev/null @@ -1,416 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use datafusion_expr::Operator; -use datatypes::value::Value; -use snafu::ensure; -use store_api::storage::RegionNumber; - -use crate::error::{self, Result}; -use crate::partition::{PartitionBound, PartitionExpr, PartitionRule}; - -/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule](crate::range::RangePartitionRule) except that it allows -/// partitioning by multiple columns. -/// -/// This rule is generated from create table request, using MySQL's syntax: -/// -/// ```SQL -/// CREATE TABLE table_name ( -/// columns definition -/// ) -/// PARTITION BY RANGE COLUMNS(column_list) ( -/// PARTITION region_name VALUES LESS THAN (value_list)[, -/// PARTITION region_name VALUES LESS THAN (value_list)][, -/// ...] -/// ) -/// -/// column_list: -/// column_name[, column_name][, ...] -/// -/// value_list: -/// value[, value][, ...] -/// ``` -/// -/// Please refer to MySQL's ["RANGE COLUMNS Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html) -/// document for more details. -pub struct RangeColumnsPartitionRule { - column_list: Vec, - value_lists: Vec>, - regions: Vec, - - // TODO(LFC): Implement finding regions by all partitioning columns, not by the first one only. - // Singled out the first partitioning column's bounds for finding regions by range. - // - // Theoretically, finding regions in `value_list`s should use all the partition columns values - // as a whole in the comparison (think of how Rust's vector is compared to each other). And - // this is how we do it if provided with concrete values (see `find_region` method). - // - // However, when we need to find regions by range, for example, a filter of "x < 100" defined - // in SQL, currently I'm not quite sure how that could be implemented. Especially facing the complex - // filter expression like "a < 1 AND (b > 2 OR c != 3)". - // - // So I decided to use the first partitioning column temporarily in finding regions by range, - // and further investigate how MySQL (and others) implemented this feature in detail. - // - // Finding regions only using the first partitioning column is fine. It might return regions that - // actually do not contain the range's value (causing unnecessary table scans), but will - // not lose any data that should have been scanned. - // - // The following two fields are acted as caches, so we don't need to recalculate them every time. - first_column_bounds: Vec, - first_column_regions: Vec>, -} - -impl RangeColumnsPartitionRule { - // It's assured that input arguments are valid because they are checked in SQL parsing stage. - // So we can skip validating them. - pub fn new( - column_list: Vec, - value_lists: Vec>, - regions: Vec, - ) -> Self { - // An example range columns partition rule to calculate the first column bounds and regions: - // SQL: - // PARTITION p1 VALUES LESS THAN (10, 'c'), - // PARTITION p2 VALUES LESS THAN (20, 'h'), - // PARTITION p3 VALUES LESS THAN (20, 'm'), - // PARTITION p4 VALUES LESS THAN (50, 'p'), - // PARTITION p5 VALUES LESS THAN (MAXVALUE, 'x'), - // PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE), - // first column bounds: - // [10, 20, 50, MAXVALUE] - // first column regions: - // [[1], [2, 3], [4], [5, 6]] - - let first_column_bounds = value_lists - .iter() - .map(|x| &x[0]) - .collect::>(); - - let mut distinct_bounds = Vec::::new(); - distinct_bounds.push(first_column_bounds[0].clone()); - let mut first_column_regions = Vec::>::new(); - first_column_regions.push(vec![regions[0]]); - - for i in 1..first_column_bounds.len() { - if first_column_bounds[i] == &distinct_bounds[distinct_bounds.len() - 1] { - first_column_regions[distinct_bounds.len() - 1].push(regions[i]); - } else { - distinct_bounds.push(first_column_bounds[i].clone()); - first_column_regions.push(vec![regions[i]]); - } - } - - Self { - column_list, - value_lists, - regions, - first_column_bounds: distinct_bounds, - first_column_regions, - } - } - - pub fn column_list(&self) -> &Vec { - &self.column_list - } - - pub fn value_lists(&self) -> &Vec> { - &self.value_lists - } - - pub fn regions(&self) -> &Vec { - &self.regions - } -} - -impl PartitionRule for RangeColumnsPartitionRule { - fn as_any(&self) -> &dyn Any { - self - } - - fn partition_columns(&self) -> Vec { - self.column_list.clone() - } - - fn find_region(&self, values: &[Value]) -> Result { - ensure!( - values.len() == self.column_list.len(), - error::RegionKeysSizeSnafu { - expect: self.column_list.len(), - actual: values.len(), - } - ); - - // How tuple is compared: - // (a, b) < (x, y) <= (a < x) || ((a == x) && (b < y)) - let values = values - .iter() - .map(|v| PartitionBound::Value(v.clone())) - .collect::>(); - Ok(match self.value_lists.binary_search(&values) { - Ok(i) => self.regions[i + 1], - Err(i) => self.regions[i], - }) - } - - fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result> { - let regions = - if !exprs.is_empty() && exprs.iter().all(|x| self.column_list.contains(&x.column)) { - let PartitionExpr { - column: _, - op, - value, - } = exprs - .iter() - .find(|x| x.column == self.column_list[0]) - // "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs" - .unwrap(); - - let regions = &self.first_column_regions; - match self - .first_column_bounds - .binary_search(&PartitionBound::Value(value.clone())) - { - Ok(i) => match op { - Operator::Lt => ®ions[..=i], - Operator::LtEq => ®ions[..=(i + 1)], - Operator::Eq => ®ions[(i + 1)..=(i + 1)], - Operator::Gt | Operator::GtEq => ®ions[(i + 1)..], - Operator::NotEq => ®ions[..], - _ => unimplemented!(), - }, - Err(i) => match op { - Operator::Lt | Operator::LtEq => ®ions[..=i], - Operator::Eq => ®ions[i..=i], - Operator::Gt | Operator::GtEq => ®ions[i..], - Operator::NotEq => ®ions[..], - _ => unimplemented!(), - }, - } - .iter() - .flatten() - .cloned() - .collect::>() - } else { - self.regions.clone() - }; - Ok(regions) - } -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use super::*; - use crate::partition::{PartitionBound, PartitionExpr}; - - #[test] - fn test_find_regions() { - // PARTITION BY RANGE COLUMNS(a, b) - // PARTITION p1 VALUES LESS THAN ('hz', 10), - // PARTITION p2 VALUES LESS THAN ('sh', 20), - // PARTITION p3 VALUES LESS THAN ('sh', 50), - // PARTITION p4 VALUES LESS THAN ('sz', 100), - // PARTITION p5 VALUES LESS THAN (MAXVALUE, 200), - // PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE), - let rule = RangeColumnsPartitionRule::new( - vec!["a".to_string(), "b".to_string()], - vec![ - vec![ - PartitionBound::Value("hz".into()), - PartitionBound::Value(10_i32.into()), - ], - vec![ - PartitionBound::Value("sh".into()), - PartitionBound::Value(20_i32.into()), - ], - vec![ - PartitionBound::Value("sh".into()), - PartitionBound::Value(50_i32.into()), - ], - vec![ - PartitionBound::Value("sz".into()), - PartitionBound::Value(100_i32.into()), - ], - vec![ - PartitionBound::MaxValue, - PartitionBound::Value(200_i32.into()), - ], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue], - ], - vec![1, 2, 3, 4, 5, 6], - ); - - let test = |op: Operator, value: &str, expected_regions: Vec| { - let exprs = vec![ - // Intentionally fix column b's partition expr to "b < 1". If we support finding - // regions by both columns("a" and "b") in the future, some test cases should fail. - PartitionExpr { - column: "b".to_string(), - op: Operator::Lt, - value: 1_i32.into(), - }, - PartitionExpr { - column: "a".to_string(), - op, - value: value.into(), - }, - ]; - let regions = rule.find_regions_by_exprs(&exprs).unwrap(); - assert_eq!( - regions, - expected_regions.into_iter().collect::>() - ); - }; - - test(Operator::NotEq, "hz", vec![1, 2, 3, 4, 5, 6]); - test(Operator::NotEq, "what", vec![1, 2, 3, 4, 5, 6]); - - test(Operator::GtEq, "ab", vec![1, 2, 3, 4, 5, 6]); - test(Operator::GtEq, "hz", vec![2, 3, 4, 5, 6]); - test(Operator::GtEq, "ijk", vec![2, 3, 4, 5, 6]); - test(Operator::GtEq, "sh", vec![4, 5, 6]); - test(Operator::GtEq, "ssh", vec![4, 5, 6]); - test(Operator::GtEq, "sz", vec![5, 6]); - test(Operator::GtEq, "zz", vec![5, 6]); - - test(Operator::Gt, "ab", vec![1, 2, 3, 4, 5, 6]); - test(Operator::Gt, "hz", vec![2, 3, 4, 5, 6]); - test(Operator::Gt, "ijk", vec![2, 3, 4, 5, 6]); - test(Operator::Gt, "sh", vec![4, 5, 6]); - test(Operator::Gt, "ssh", vec![4, 5, 6]); - test(Operator::Gt, "sz", vec![5, 6]); - test(Operator::Gt, "zz", vec![5, 6]); - - test(Operator::Eq, "ab", vec![1]); - test(Operator::Eq, "hz", vec![2, 3]); - test(Operator::Eq, "ijk", vec![2, 3]); - test(Operator::Eq, "sh", vec![4]); - test(Operator::Eq, "ssh", vec![4]); - test(Operator::Eq, "sz", vec![5, 6]); - test(Operator::Eq, "zz", vec![5, 6]); - - test(Operator::Lt, "ab", vec![1]); - test(Operator::Lt, "hz", vec![1]); - test(Operator::Lt, "ijk", vec![1, 2, 3]); - test(Operator::Lt, "sh", vec![1, 2, 3]); - test(Operator::Lt, "ssh", vec![1, 2, 3, 4]); - test(Operator::Lt, "sz", vec![1, 2, 3, 4]); - test(Operator::Lt, "zz", vec![1, 2, 3, 4, 5, 6]); - - test(Operator::LtEq, "ab", vec![1]); - test(Operator::LtEq, "hz", vec![1, 2, 3]); - test(Operator::LtEq, "ijk", vec![1, 2, 3]); - test(Operator::LtEq, "sh", vec![1, 2, 3, 4]); - test(Operator::LtEq, "ssh", vec![1, 2, 3, 4]); - test(Operator::LtEq, "sz", vec![1, 2, 3, 4, 5, 6]); - test(Operator::LtEq, "zz", vec![1, 2, 3, 4, 5, 6]); - - // If trying to find regions that is not partitioning column, return all regions. - let exprs = vec![ - PartitionExpr { - column: "c".to_string(), - op: Operator::Lt, - value: 1_i32.into(), - }, - PartitionExpr { - column: "a".to_string(), - op: Operator::Lt, - value: "hz".into(), - }, - ]; - let regions = rule.find_regions_by_exprs(&exprs).unwrap(); - assert_eq!(regions, vec![1, 2, 3, 4, 5, 6]); - } - - #[test] - fn test_find_region() { - // PARTITION BY RANGE COLUMNS(a) ( - // PARTITION r1 VALUES LESS THAN ('hz'), - // PARTITION r2 VALUES LESS THAN ('sh'), - // PARTITION r3 VALUES LESS THAN (MAXVALUE), - // ) - let rule = RangeColumnsPartitionRule::new( - vec!["a".to_string()], - vec![ - vec![PartitionBound::Value("hz".into())], - vec![PartitionBound::Value("sh".into())], - vec![PartitionBound::MaxValue], - ], - vec![1, 2, 3], - ); - assert_matches!( - rule.find_region(&["foo".into(), 1000_i32.into()]), - Err(error::Error::RegionKeysSize { - expect: 1, - actual: 2, - .. - }) - ); - assert_matches!(rule.find_region(&["foo".into()]), Ok(1)); - assert_matches!(rule.find_region(&["bar".into()]), Ok(1)); - assert_matches!(rule.find_region(&["hz".into()]), Ok(2)); - assert_matches!(rule.find_region(&["hzz".into()]), Ok(2)); - assert_matches!(rule.find_region(&["sh".into()]), Ok(3)); - assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3)); - - // PARTITION BY RANGE COLUMNS(a, b) ( - // PARTITION r1 VALUES LESS THAN ('hz', 10), - // PARTITION r2 VALUES LESS THAN ('hz', 20), - // PARTITION r3 VALUES LESS THAN ('sh', 50), - // PARTITION r4 VALUES LESS THAN (MAXVALUE, MAXVALUE), - // ) - let rule = RangeColumnsPartitionRule::new( - vec!["a".to_string(), "b".to_string()], - vec![ - vec![ - PartitionBound::Value("hz".into()), - PartitionBound::Value(10_i32.into()), - ], - vec![ - PartitionBound::Value("hz".into()), - PartitionBound::Value(20_i32.into()), - ], - vec![ - PartitionBound::Value("sh".into()), - PartitionBound::Value(50_i32.into()), - ], - vec![PartitionBound::MaxValue, PartitionBound::MaxValue], - ], - vec![1, 2, 3, 4], - ); - assert_matches!( - rule.find_region(&["foo".into()]), - Err(error::Error::RegionKeysSize { - expect: 2, - actual: 1, - .. - }) - ); - assert_matches!(rule.find_region(&["foo".into(), 1_i32.into()]), Ok(1)); - assert_matches!(rule.find_region(&["bar".into(), 11_i32.into()]), Ok(1)); - assert_matches!(rule.find_region(&["hz".into(), 2_i32.into()]), Ok(1)); - assert_matches!(rule.find_region(&["hz".into(), 12_i32.into()]), Ok(2)); - assert_matches!(rule.find_region(&["hz".into(), 22_i32.into()]), Ok(3)); - assert_matches!(rule.find_region(&["hz".into(), 999_i32.into()]), Ok(3)); - assert_matches!(rule.find_region(&["hzz".into(), 1_i32.into()]), Ok(3)); - assert_matches!(rule.find_region(&["hzz".into(), 999_i32.into()]), Ok(3)); - assert_matches!(rule.find_region(&["sh".into(), 49_i32.into()]), Ok(3)); - assert_matches!(rule.find_region(&["sh".into(), 50_i32.into()]), Ok(4)); - assert_matches!(rule.find_region(&["zzz".into(), 1_i32.into()]), Ok(4)); - } -} diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 92d2f560d4..00c18695dc 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -18,7 +18,6 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::ScalarValue; -use datafusion_expr::expr::Expr; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::metadata::TableId; @@ -93,20 +92,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to find region, reason: {}", reason))] - FindRegion { - reason: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to find regions by filters: {:?}", filters))] - FindRegions { - filters: Vec, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, @@ -201,9 +186,7 @@ impl ErrorExt for Error { | Error::InvalidExpr { .. } | Error::UndefinedColumn { .. } => StatusCode::InvalidArguments, - Error::FindRegion { .. } - | Error::FindRegions { .. } - | Error::RegionKeysSize { .. } + Error::RegionKeysSize { .. } | Error::InvalidInsertRequest { .. } | Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments, diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index e088c7c841..b1843a1093 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -16,13 +16,11 @@ //! Structs and traits for partitioning rule. -pub mod columns; pub mod error; pub mod expr; pub mod manager; pub mod multi_dim; pub mod partition; -pub mod range; pub mod splitter; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 1e33eda54d..6a96da06e4 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use api::v1::Rows; @@ -21,17 +21,13 @@ use common_meta::key::table_route::{PhysicalTableRouteValue, TableRouteManager}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::rpc::router::{self, RegionRoute}; -use datafusion_expr::{BinaryExpr, Expr, Operator}; -use datatypes::prelude::Value; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; -use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; use crate::multi_dim::MultiDimPartitionRule; -use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; -use crate::range::RangePartitionRule; +use crate::partition::{PartitionBound, PartitionDef}; use crate::splitter::RowSplitter; use crate::{error, PartitionRuleRef}; @@ -175,88 +171,6 @@ impl PartitionRuleManager { Ok(Arc::new(rule) as _) } - /// Get partition rule of given table. - pub async fn find_table_partition_rule_deprecated( - &self, - table_id: TableId, - ) -> Result { - let partitions = self.find_table_partitions(table_id).await?; - - debug_assert!(!partitions.is_empty()); - - let partition_columns = partitions[0].partition.partition_columns(); - - let regions = partitions - .iter() - .map(|x| x.id.region_number()) - .collect::>(); - - // TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way. - let partition_rule: PartitionRuleRef = match partition_columns.len() { - 1 => { - // Omit the last "MAXVALUE". - let bounds = partitions - .iter() - .filter_map(|info| match &info.partition.partition_bounds()[0] { - PartitionBound::Value(v) => Some(v.clone()), - PartitionBound::MaxValue => None, - PartitionBound::Expr(_) => None, - }) - .collect::>(); - Arc::new(RangePartitionRule::new( - partition_columns[0].clone(), - bounds, - regions, - )) as _ - } - _ => { - let bounds = partitions - .iter() - .map(|x| x.partition.partition_bounds().clone()) - .collect::>>(); - Arc::new(RangeColumnsPartitionRule::new( - partition_columns.clone(), - bounds, - regions, - )) as _ - } - }; - Ok(partition_rule) - } - - /// Find regions in partition rule by filters. - pub fn find_regions_by_filters( - &self, - partition_rule: PartitionRuleRef, - filters: &[Expr], - ) -> Result> { - let regions = if let Some((first, rest)) = filters.split_first() { - let mut target = find_regions0(partition_rule.clone(), first)?; - for filter in rest { - let regions = find_regions0(partition_rule.clone(), filter)?; - - // When all filters are provided as a collection, it often implicitly states that - // "all filters must be satisfied". So we join all the results here. - target.retain(|x| regions.contains(x)); - - // Failed fast, empty collection join any is empty. - if target.is_empty() { - break; - } - } - target.into_iter().collect::>() - } else { - partition_rule.find_regions_by_exprs(&[])? - }; - ensure!( - !regions.is_empty(), - error::FindRegionsSnafu { - filters: filters.to_vec() - } - ); - Ok(regions) - } - pub async fn find_region_leader(&self, region_id: RegionId) -> Result { let region_routes = &self .find_physical_table_route(region_id.table_id()) @@ -324,56 +238,3 @@ fn create_partitions_from_region_routes( Ok(partitions) } - -fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { - match filter { - Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.supports_propagation() => { - let column_op_value = match (left.as_ref(), right.as_ref()) { - (Expr::Column(c), Expr::Literal(v)) => Some((&c.name, *op, v)), - (Expr::Literal(v), Expr::Column(c)) => Some(( - &c.name, - // Safety: previous branch ensures this is a comparison operator - op.swap().unwrap(), - v, - )), - _ => None, - }; - if let Some((column, op, scalar)) = column_op_value { - let value = Value::try_from(scalar.clone()).with_context(|_| { - error::ConvertScalarValueSnafu { - value: scalar.clone(), - } - })?; - return Ok(partition_rule - .find_regions_by_exprs(&[PartitionExpr::new(column, op, value)])? - .into_iter() - .collect::>()); - } - } - Expr::BinaryExpr(BinaryExpr { left, op, right }) - if matches!(op, Operator::And | Operator::Or) => - { - let left_regions = find_regions0(partition_rule.clone(), &left.clone())?; - let right_regions = find_regions0(partition_rule.clone(), &right.clone())?; - let regions = match op { - Operator::And => left_regions - .intersection(&right_regions) - .cloned() - .collect::>(), - Operator::Or => left_regions - .union(&right_regions) - .cloned() - .collect::>(), - _ => unreachable!(), - }; - return Ok(regions); - } - _ => (), - } - - // Returns all regions for not supported partition expr as a safety hatch. - Ok(partition_rule - .find_regions_by_exprs(&[])? - .into_iter() - .collect::>()) -} diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index e8841470d7..f47d71f98b 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -148,13 +148,6 @@ impl PartitionRule for MultiDimPartitionRule { fn find_region(&self, values: &[Value]) -> Result { self.find_region(values) } - - fn find_regions_by_exprs( - &self, - _exprs: &[crate::partition::PartitionExpr], - ) -> Result> { - Ok(self.regions.clone()) - } } /// Helper for [RuleChecker] diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 28cda6a817..ac965034c6 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -37,11 +37,6 @@ pub trait PartitionRule: Sync + Send { /// /// Note that the `values` should have the same length as the `partition_columns`. fn find_region(&self, values: &[Value]) -> Result; - - /// Finds the target regions by the partition expressions. - /// - /// Note that the `exprs` should have the same length as the `partition_columns`. - fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result>; } /// The right bound(exclusive) of partition range. diff --git a/src/partition/src/range.rs b/src/partition/src/range.rs deleted file mode 100644 index 26639d7ffe..0000000000 --- a/src/partition/src/range.rs +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use datafusion_expr::Operator; -use datatypes::prelude::*; -use serde::{Deserialize, Serialize}; -use snafu::OptionExt; -use store_api::storage::RegionNumber; - -use crate::error::{self, Error}; -use crate::partition::{PartitionExpr, PartitionRule}; - -/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value -/// range. It's generated from create table request, using MySQL's syntax: -/// -/// ```SQL -/// CREATE TABLE table_name ( -/// columns definition -/// ) -/// PARTITION BY RANGE (column_name) ( -/// PARTITION partition_name VALUES LESS THAN (value)[, -/// PARTITION partition_name VALUES LESS THAN (value)][, -/// ...] -/// ) -/// ``` -/// -/// Please refer to MySQL's ["RANGE Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html) -/// document for more details. -/// -/// Some partition related validations like: -/// - the column used in partitioning must be defined in the create table request -/// - partition name must be unique -/// - range bounds(the "value"s) must be strictly increased -/// - the last partition range must be bounded by "MAXVALUE" -/// -/// are all been done in the create table SQL parsing stage. So we can safely skip some checks on the -/// input arguments. -/// -/// # Important Notes on Partition and Region -/// -/// Technically, table "partition" is a concept of data sharding logically, i.e., how table's data are -/// distributed in logic. And "region" is of how data been placed physically. They should be used -/// in different ways. -/// -/// However, currently we have only one region for each partition. For the sake of simplicity, the -/// terms "partition" and "region" are used interchangeably. -/// -// TODO(LFC): Further clarify "partition" and "region". -// Could be creating an extra layer between partition and region. -#[derive(Debug, Serialize, Deserialize)] -pub struct RangePartitionRule { - column_name: String, - // Does not store the last "MAXVALUE" bound; because in this way our binary search in finding - // partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`). - // Then the length of `bounds` is one less than `regions`. - bounds: Vec, - regions: Vec, -} - -impl RangePartitionRule { - pub fn new( - column_name: impl Into, - bounds: Vec, - regions: Vec, - ) -> Self { - Self { - column_name: column_name.into(), - bounds, - regions, - } - } - - pub fn column_name(&self) -> &String { - &self.column_name - } - - pub fn all_regions(&self) -> &Vec { - &self.regions - } - - pub fn bounds(&self) -> &Vec { - &self.bounds - } -} - -impl PartitionRule for RangePartitionRule { - fn as_any(&self) -> &dyn Any { - self - } - - fn partition_columns(&self) -> Vec { - vec![self.column_name().to_string()] - } - - fn find_region(&self, values: &[Value]) -> Result { - debug_assert_eq!( - values.len(), - 1, - "RangePartitionRule can only handle one partition value, actual {}", - values.len() - ); - let value = &values[0]; - - Ok(match self.bounds.binary_search(value) { - Ok(i) => self.regions[i + 1], - Err(i) => self.regions[i], - }) - } - - fn find_regions_by_exprs(&self, exprs: &[PartitionExpr]) -> Result, Error> { - if exprs.is_empty() { - return Ok(self.regions.clone()); - } - debug_assert_eq!( - exprs.len(), - 1, - "RangePartitionRule can only handle one partition expr, actual {}", - exprs.len() - ); - - let PartitionExpr { column, op, value } = - exprs.first().context(error::FindRegionSnafu { - reason: "no partition expr is provided", - })?; - let regions = if column == self.column_name() { - // an example of bounds and regions: - // SQL: - // PARTITION p1 VALUES LESS THAN (10), - // PARTITION p2 VALUES LESS THAN (20), - // PARTITION p3 VALUES LESS THAN (50), - // PARTITION p4 VALUES LESS THAN (MAXVALUE), - // bounds: [10, 20, 50] - // regions: [1, 2, 3, 4] - match self.bounds.binary_search(value) { - Ok(i) => match op { - Operator::Lt => &self.regions[..=i], - Operator::LtEq => &self.regions[..=(i + 1)], - Operator::Eq => &self.regions[(i + 1)..=(i + 1)], - Operator::Gt | Operator::GtEq => &self.regions[(i + 1)..], - Operator::NotEq => &self.regions[..], - _ => unimplemented!(), - }, - Err(i) => match op { - Operator::Lt | Operator::LtEq => &self.regions[..=i], - Operator::Eq => &self.regions[i..=i], - Operator::Gt | Operator::GtEq => &self.regions[i..], - Operator::NotEq => &self.regions[..], - _ => unimplemented!(), - }, - } - .to_vec() - } else { - self.all_regions().clone() - }; - Ok(regions) - } -} - -#[cfg(test)] -mod test { - use datafusion_expr::Operator; - - use super::*; - use crate::partition::PartitionExpr; - - #[test] - fn test_find_regions() { - // PARTITION BY RANGE (a) ( - // PARTITION p1 VALUES LESS THAN ('hz'), - // PARTITION p2 VALUES LESS THAN ('sh'), - // PARTITION p3 VALUES LESS THAN ('sz'), - // PARTITION p4 VALUES LESS THAN (MAXVALUE), - // ) - let rule = RangePartitionRule { - column_name: "a".to_string(), - bounds: vec!["hz".into(), "sh".into(), "sz".into()], - regions: vec![1, 2, 3, 4], - }; - - let test = - |column: &str, op: Operator, value: &str, expected_regions: Vec| { - let expr = PartitionExpr { - column: column.to_string(), - op, - value: value.into(), - }; - let regions = rule.find_regions_by_exprs(&[expr]).unwrap(); - assert_eq!( - regions, - expected_regions.into_iter().collect::>() - ); - }; - - test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]); - test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]); - - test("a", Operator::GtEq, "ab", vec![1, 2, 3, 4]); - test("a", Operator::GtEq, "hz", vec![2, 3, 4]); - test("a", Operator::GtEq, "ijk", vec![2, 3, 4]); - test("a", Operator::GtEq, "sh", vec![3, 4]); - test("a", Operator::GtEq, "ssh", vec![3, 4]); - test("a", Operator::GtEq, "sz", vec![4]); - test("a", Operator::GtEq, "zz", vec![4]); - - test("a", Operator::Gt, "ab", vec![1, 2, 3, 4]); - test("a", Operator::Gt, "hz", vec![2, 3, 4]); - test("a", Operator::Gt, "ijk", vec![2, 3, 4]); - test("a", Operator::Gt, "sh", vec![3, 4]); - test("a", Operator::Gt, "ssh", vec![3, 4]); - test("a", Operator::Gt, "sz", vec![4]); - test("a", Operator::Gt, "zz", vec![4]); - - test("a", Operator::Eq, "ab", vec![1]); - test("a", Operator::Eq, "hz", vec![2]); - test("a", Operator::Eq, "ijk", vec![2]); - test("a", Operator::Eq, "sh", vec![3]); - test("a", Operator::Eq, "ssh", vec![3]); - test("a", Operator::Eq, "sz", vec![4]); - test("a", Operator::Eq, "zz", vec![4]); - - test("a", Operator::Lt, "ab", vec![1]); - test("a", Operator::Lt, "hz", vec![1]); - test("a", Operator::Lt, "ijk", vec![1, 2]); - test("a", Operator::Lt, "sh", vec![1, 2]); - test("a", Operator::Lt, "ssh", vec![1, 2, 3]); - test("a", Operator::Lt, "sz", vec![1, 2, 3]); - test("a", Operator::Lt, "zz", vec![1, 2, 3, 4]); - - test("a", Operator::LtEq, "ab", vec![1]); - test("a", Operator::LtEq, "hz", vec![1, 2]); - test("a", Operator::LtEq, "ijk", vec![1, 2]); - test("a", Operator::LtEq, "sh", vec![1, 2, 3]); - test("a", Operator::LtEq, "ssh", vec![1, 2, 3]); - test("a", Operator::LtEq, "sz", vec![1, 2, 3, 4]); - test("a", Operator::LtEq, "zz", vec![1, 2, 3, 4]); - - test("b", Operator::Lt, "1", vec![1, 2, 3, 4]); - } -} diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index 73a5a01892..f62210a6b5 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -139,7 +139,6 @@ mod tests { use serde::{Deserialize, Serialize}; use super::*; - use crate::partition::PartitionExpr; use crate::PartitionRule; fn mock_rows() -> Rows { @@ -210,10 +209,6 @@ mod tests { Ok(val.parse::().unwrap() % 2) } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } } #[derive(Debug, Serialize, Deserialize)] @@ -237,10 +232,6 @@ mod tests { Ok(val) } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } } #[derive(Debug, Serialize, Deserialize)] @@ -258,10 +249,6 @@ mod tests { fn find_region(&self, _values: &[Value]) -> Result { Ok(0) } - - fn find_regions_by_exprs(&self, _: &[PartitionExpr]) -> Result> { - unimplemented!() - } } #[test]