From 028a69e34945834d3850941a636e5a43316cd46c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 1 Feb 2023 19:24:49 +0800 Subject: [PATCH] refactor: move partition related code to partition manager (#906) * wip: fix compile errors * chore: move splitter to partition crate * fix: remove useless variants in frontend errors * chore: move more partition related code to partition manager * fix: license header * wip: move WriteSplitter to PartitionRuleManager * fix: clippy warnings * chore: remove useless error variant and format toml * fix: cr comments * chore: resolve conflicts * chore: rebase develop * fix: cr comments --- Cargo.lock | 20 + Cargo.toml | 1 + src/frontend/Cargo.toml | 1 + src/frontend/src/catalog.rs | 24 +- src/frontend/src/error.rs | 134 ++---- src/frontend/src/instance.rs | 7 +- src/frontend/src/instance/distributed.rs | 17 +- src/frontend/src/lib.rs | 2 - src/frontend/src/table.rs | 408 ++++-------------- src/frontend/src/table/insert.rs | 11 +- src/frontend/src/tests.rs | 9 +- src/partition/Cargo.toml | 22 + .../partitioning => partition/src}/columns.rs | 20 +- src/partition/src/error.rs | 155 +++++++ src/partition/src/lib.rs | 25 ++ src/partition/src/manager.rs | 282 ++++++++++++ .../src/partition.rs} | 34 +- .../partitioning => partition/src}/range.rs | 21 +- .../src/table => partition/src}/route.rs | 14 +- .../spliter.rs => partition/src/splitter.rs} | 58 ++- 20 files changed, 709 insertions(+), 556 deletions(-) create mode 100644 src/partition/Cargo.toml rename src/{frontend/src/partitioning => partition/src}/columns.rs (97%) create mode 100644 src/partition/src/error.rs create mode 100644 src/partition/src/lib.rs create mode 100644 src/partition/src/manager.rs rename src/{frontend/src/partitioning.rs => partition/src/partition.rs} (88%) rename src/{frontend/src/partitioning => partition/src}/range.rs (96%) rename src/{frontend/src/table => partition/src}/route.rs (86%) rename src/{frontend/src/spliter.rs => partition/src/splitter.rs} (90%) diff --git a/Cargo.lock b/Cargo.lock index dbb67be715..a01b444949 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2687,6 +2687,7 @@ dependencies = [ "meta-srv", "moka", "openmetrics-parser", + "partition", "prost 0.11.6", "query", "rustls", @@ -4715,6 +4716,25 @@ dependencies = [ "regex", ] +[[package]] +name = "partition" +version = "0.1.0" +dependencies = [ + "common-error", + "common-query", + "datafusion", + "datafusion-common", + "datafusion-expr", + "datatypes", + "meta-client", + "moka", + "serde", + "serde_json", + "snafu", + "store-api", + "table", +] + [[package]] name = "paste" version = "1.0.11" diff --git a/Cargo.toml b/Cargo.toml index 8eb6855006..7d61010864 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "src/meta-srv", "src/mito", "src/object-store", + "src/partition", "src/promql", "src/query", "src/script", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 91c49d730a..904a18dd4f 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -32,6 +32,7 @@ itertools = "0.10" meta-client = { path = "../meta-client" } moka = { version = "0.9", features = ["future"] } openmetrics-parser = "0.4" +partition = { path = "../partition" } prost.workspace = true query = { path = "../query" } rustls = "0.20" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index b610fa24f9..b7a7926df9 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -29,29 +29,29 @@ use catalog::{ }; use futures::StreamExt; use meta_client::rpc::TableName; +use partition::manager::PartitionRuleManagerRef; use snafu::prelude::*; use table::TableRef; use crate::datanode::DatanodeClients; -use crate::table::route::TableRoutes; use crate::table::DistTable; #[derive(Clone)] pub struct FrontendCatalogManager { backend: KvBackendRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, } impl FrontendCatalogManager { pub(crate) fn new( backend: KvBackendRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, ) -> Self { Self { backend, - table_routes, + partition_manager, datanode_clients, } } @@ -61,8 +61,8 @@ impl FrontendCatalogManager { } #[cfg(test)] - pub(crate) fn table_routes(&self) -> Arc { - self.table_routes.clone() + pub(crate) fn partition_manager(&self) -> PartitionRuleManagerRef { + self.partition_manager.clone() } #[cfg(test)] @@ -173,7 +173,7 @@ impl CatalogList for FrontendCatalogManager { Ok(Some(Arc::new(FrontendCatalogProvider { catalog_name: name.to_string(), backend: self.backend.clone(), - table_routes: self.table_routes.clone(), + partition_manager: self.partition_manager.clone(), datanode_clients: self.datanode_clients.clone(), }))) } else { @@ -185,7 +185,7 @@ impl CatalogList for FrontendCatalogManager { pub struct FrontendCatalogProvider { catalog_name: String, backend: KvBackendRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, } @@ -232,7 +232,7 @@ impl CatalogProvider for FrontendCatalogProvider { catalog_name: self.catalog_name.clone(), schema_name: name.to_string(), backend: self.backend.clone(), - table_routes: self.table_routes.clone(), + partition_manager: self.partition_manager.clone(), datanode_clients: self.datanode_clients.clone(), }))) } else { @@ -245,7 +245,7 @@ pub struct FrontendSchemaProvider { catalog_name: String, schema_name: String, backend: KvBackendRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, } @@ -286,7 +286,7 @@ impl SchemaProvider for FrontendSchemaProvider { }; let backend = self.backend.clone(); - let table_routes = self.table_routes.clone(); + let partition_manager = self.partition_manager.clone(); let datanode_clients = self.datanode_clients.clone(); let table_name = TableName::new(&self.catalog_name, &self.schema_name, name); let result: Result, catalog::error::Error> = std::thread::spawn(|| { @@ -306,7 +306,7 @@ impl SchemaProvider for FrontendSchemaProvider { .try_into() .context(catalog_err::InvalidTableInfoInCatalogSnafu)?, ), - table_routes, + partition_manager, datanode_clients, backend, )); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index a71240c540..4498ba580c 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -15,9 +15,6 @@ use std::any::Any; use common_error::prelude::*; -use common_query::logical_plan::Expr; -use datafusion_common::ScalarValue; -use datatypes::prelude::Value; use store_api::storage::RegionId; #[derive(Debug, Snafu)] @@ -99,35 +96,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to convert DataFusion's ScalarValue: {:?}, source: {}", - value, - source - ))] - ConvertScalarValue { - value: ScalarValue, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - - #[snafu(display("Failed to find partition column: {}", column_name))] - FindPartitionColumn { - column_name: String, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to find region, reason: {}", reason))] - FindRegion { - reason: String, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to find regions by filters: {:?}", filters))] - FindRegions { - filters: Vec, - backtrace: Backtrace, - }, - #[snafu(display("Failed to find Datanode by region: {:?}", region))] FindDatanode { region: RegionId, @@ -140,13 +108,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Expect {} region keys, actual {}", expect, actual))] - RegionKeysSize { - expect: usize, - actual: usize, - backtrace: Backtrace, - }, - #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, @@ -201,16 +162,17 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Failed to get cache, error: {}", err_msg))] - GetCache { - err_msg: String, + #[snafu(display("Failed to create table route for table {}", table_name))] + CreateTableRoute { + table_name: String, backtrace: Backtrace, }, - #[snafu(display("Failed to find table routes for table {}", table_name))] - FindTableRoutes { + #[snafu(display("Failed to find table route for table {}", table_name))] + FindTableRoute { table_name: String, - backtrace: Backtrace, + #[snafu(backtrace)] + source: partition::error::Error, }, #[snafu(display("Failed to create AlterExpr from Alter statement, source: {}", source))] @@ -252,24 +214,12 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to find region routes for table {}", table_name))] - FindRegionRoutes { + #[snafu(display("Failed to find region route for table {}", table_name))] + FindRegionRoute { table_name: String, backtrace: Backtrace, }, - #[snafu(display("Failed to serialize value to json, source: {}", source))] - SerializeJson { - source: serde_json::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to deserialize value from json, source: {}", source))] - DeserializeJson { - source: serde_json::Error, - backtrace: Backtrace, - }, - #[snafu(display( "Failed to find leader peer for region {} in table {}", region, @@ -281,28 +231,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to find partition info for region {} in table {}", - region, - table_name - ))] - FindRegionPartition { - region: u64, - table_name: String, - backtrace: Backtrace, - }, - - #[snafu(display( - "Illegal table routes data for table {}, error message: {}", - table_name, - err_msg - ))] - IllegalTableRoutesData { - table_name: String, - err_msg: String, - backtrace: Backtrace, - }, - #[snafu(display("Cannot find primary key column by name: {}", msg))] PrimaryKeyNotFound { msg: String, backtrace: Backtrace }, @@ -345,17 +273,6 @@ pub enum Error { source: substrait::error::Error, }, - #[snafu(display( - "Failed to build a vector from values, value: {}, source: {}", - value, - source - ))] - BuildVector { - value: Value, - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to found context value: {}", key))] ContextValueNotFound { key: String, backtrace: Backtrace }, @@ -404,6 +321,15 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display( + "Failed to deserialize partition in meta to partition def, source: {}", + source + ))] + DeserializePartition { + #[snafu(backtrace)] + source: partition::error::Error, + }, } pub type Result = std::result::Result; @@ -413,19 +339,15 @@ impl ErrorExt for Error { match self { Error::ParseAddr { .. } | Error::InvalidSql { .. } - | Error::FindRegion { .. } - | Error::FindRegions { .. } | Error::InvalidInsertRequest { .. } - | Error::FindPartitionColumn { .. } - | Error::ColumnValuesNumberMismatch { .. } - | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, + | Error::ColumnValuesNumberMismatch { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, Error::RuntimeResource { source, .. } => source.status_code(), - Error::StartServer { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), + Error::StartServer { source, .. } => source.status_code(), Error::ParseSql { source } | Error::AlterExprFromStmt { source } => { source.status_code() @@ -433,8 +355,7 @@ impl ErrorExt for Error { Error::Table { source } => source.status_code(), - Error::ConvertColumnDefaultConstraint { source, .. } - | Error::ConvertScalarValue { source, .. } => source.status_code(), + Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(), Error::RequestDatanode { source } => source.status_code(), @@ -443,14 +364,9 @@ impl ErrorExt for Error { } Error::FindDatanode { .. } - | Error::GetCache { .. } - | Error::FindTableRoutes { .. } - | Error::SerializeJson { .. } - | Error::DeserializeJson { .. } - | Error::FindRegionRoutes { .. } + | Error::CreateTableRoute { .. } + | Error::FindRegionRoute { .. } | Error::FindLeaderPeer { .. } - | Error::FindRegionPartition { .. } - | Error::IllegalTableRoutesData { .. } | Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } => StatusCode::Internal, @@ -482,10 +398,12 @@ impl ErrorExt for Error { Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable, Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), - Error::BuildVector { source, .. } => source.status_code(), Error::InvokeDatanode { source } => source.status_code(), Error::ColumnDefaultValue { source, .. } => source.status_code(), Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments, + Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => { + source.status_code() + } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 812573e23e..d4603076ed 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -40,6 +40,8 @@ use datanode::instance::InstanceRef as DnInstanceRef; use distributed::DistInstance; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; +use partition::manager::PartitionRuleManager; +use partition::route::TableRoutes; use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; @@ -60,7 +62,6 @@ use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; -use crate::table::route::TableRoutes; use crate::Plugins; #[async_trait] @@ -104,10 +105,12 @@ impl Instance { client: meta_client.clone(), }); let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let partition_manager = Arc::new(PartitionRuleManager::new(table_routes)); let datanode_clients = Arc::new(DatanodeClients::new()); + let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend, - table_routes, + partition_manager, datanode_clients.clone(), )); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 1e8bde0459..897d48cd86 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -34,6 +34,7 @@ use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse, TableName, TableRoute, }; +use partition::partition::{PartitionBound, PartitionDef}; use query::parser::QueryStatement; use query::sql::{describe_table, explain, show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; @@ -51,13 +52,12 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, - ColumnDataTypeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, - RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, TableNotFoundSnafu, - TableSnafu, ToTableInsertRequestSnafu, + ColumnDataTypeSnafu, DeserializePartitionSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, + RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, + TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu, }; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; use crate::instance::parse_stmt; -use crate::partitioning::{PartitionBound, PartitionDef}; use crate::sql::insert_to_request; #[derive(Clone)] @@ -92,7 +92,7 @@ impl DistInstance { let table_routes = response.table_routes; ensure!( table_routes.len() == 1, - error::FindTableRoutesSnafu { + error::CreateTableRouteSnafu { table_name: create_table.table_name.to_string() } ); @@ -107,7 +107,7 @@ impl DistInstance { let region_routes = &table_route.region_routes; ensure!( !region_routes.is_empty(), - error::FindRegionRoutesSnafu { + error::FindRegionRouteSnafu { table_name: create_table.table_name.to_string() } ); @@ -509,8 +509,9 @@ fn parse_partitions( partition_entries .into_iter() - .map(|x| PartitionDef::new(partition_columns.clone(), x).try_into()) - .collect::>>() + .map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x))) + .collect::>() + .context(DeserializePartitionSnafu) } fn find_partition_entries( diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 14dd7328b3..d34f24c8a1 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -26,11 +26,9 @@ pub mod influxdb; pub mod instance; pub mod mysql; pub mod opentsdb; -pub mod partitioning; pub mod postgres; pub mod prometheus; mod server; -pub mod spliter; mod sql; mod table; #[cfg(test)] diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index c8919bbb8e..08f5a9113f 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod route; - use std::any::Any; -use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::AlterExpr; @@ -37,13 +34,10 @@ use datafusion::physical_plan::{ Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, }; use datafusion_common::DataFusionError; -use datafusion_expr::expr::Expr as DfExpr; -use datafusion_expr::BinaryExpr; -use datatypes::prelude::Value; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use meta_client::rpc::{Peer, TableName}; +use meta_client::rpc::TableName; +use partition::manager::PartitionRuleManagerRef; use snafu::prelude::*; -use store_api::storage::RegionNumber; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterTableRequest, InsertRequest}; @@ -52,17 +46,7 @@ use table::Table; use tokio::sync::RwLock; use crate::datanode::DatanodeClients; -use crate::error::{ - self, BuildTableMetaSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ContextValueNotFoundSnafu, - Error, LeaderNotFoundSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, TableSnafu, -}; -use crate::partitioning::columns::RangeColumnsPartitionRule; -use crate::partitioning::range::RangePartitionRule; -use crate::partitioning::{ - Operator, PartitionBound, PartitionDef, PartitionExpr, PartitionRuleRef, -}; -use crate::spliter::WriteSpliter; -use crate::table::route::TableRoutes; +use crate::error::{self, Result}; use crate::table::scan::{DatanodeInstance, TableScanPlan}; pub mod insert; @@ -72,7 +56,7 @@ pub(crate) mod scan; pub struct DistTable { table_name: TableName, table_info: TableInfoRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, backend: KvBackendRef, } @@ -92,20 +76,15 @@ impl Table for DistTable { } async fn insert(&self, request: InsertRequest) -> table::Result { - let partition_rule = self - .find_partition_rule() + let split = self + .partition_manager + .split_insert_request(&self.table_name, request) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let spliter = WriteSpliter::with_partition_rule(partition_rule); - let inserts = spliter - .split(request) - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - let output = self - .dist_insert(inserts) + .dist_insert(split) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -120,17 +99,20 @@ impl Table for DistTable { limit: Option, ) -> table::Result { let partition_rule = self - .find_partition_rule() + .partition_manager + .find_table_partition_rule(&self.table_name) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; let regions = self - .find_regions(partition_rule, filters) + .partition_manager + .find_regions_by_filters(partition_rule, filters) .map_err(BoxedError::new) .context(TableOperationSnafu)?; let datanodes = self - .find_datanodes(regions) + .partition_manager + .find_region_datanodes(&self.table_name, regions) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; @@ -175,217 +157,19 @@ impl DistTable { pub(crate) fn new( table_name: TableName, table_info: TableInfoRef, - table_routes: Arc, + partition_manager: PartitionRuleManagerRef, datanode_clients: Arc, backend: KvBackendRef, ) -> Self { Self { table_name, table_info, - table_routes, + partition_manager, datanode_clients, backend, } } - // TODO(LFC): Finding regions now seems less efficient, should be further looked into. - fn find_regions( - &self, - partition_rule: PartitionRuleRef, - filters: &[Expr], - ) -> Result> { - let regions = if let Some((first, rest)) = filters.split_first() { - let mut target = Self::find_regions0(partition_rule.clone(), first)?; - for filter in rest { - let regions = Self::find_regions0(partition_rule.clone(), filter)?; - - // When all filters are provided as a collection, it often implicitly states that - // "all filters must be satisfied". So we join all the results here. - target.retain(|x| regions.contains(x)); - - // Failed fast, empty collection join any is empty. - if target.is_empty() { - break; - } - } - target.into_iter().collect::>() - } else { - partition_rule.find_regions(&[])? - }; - ensure!( - !regions.is_empty(), - error::FindRegionsSnafu { - filters: filters.to_vec() - } - ); - Ok(regions) - } - - // TODO(LFC): Support other types of filter expr: - // - BETWEEN and IN (maybe more) - // - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?) - // - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x" - fn find_regions0( - partition_rule: PartitionRuleRef, - filter: &Expr, - ) -> Result> { - let expr = filter.df_expr(); - match expr { - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => { - let column_op_value = match (left.as_ref(), right.as_ref()) { - (DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)), - (DfExpr::Literal(v), DfExpr::Column(c)) => { - Some((&c.name, reverse_operator(op), v)) - } - _ => None, - }; - if let Some((column, op, sv)) = column_op_value { - let value = sv - .clone() - .try_into() - .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; - return Ok(partition_rule - .find_regions(&[PartitionExpr::new(column, op, value)])? - .into_iter() - .collect::>()); - } - } - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) - if matches!(op, Operator::And | Operator::Or) => - { - let left_regions = - Self::find_regions0(partition_rule.clone(), &(*left.clone()).into())?; - let right_regions = - Self::find_regions0(partition_rule.clone(), &(*right.clone()).into())?; - let regions = match op { - Operator::And => left_regions - .intersection(&right_regions) - .cloned() - .collect::>(), - Operator::Or => left_regions - .union(&right_regions) - .cloned() - .collect::>(), - _ => unreachable!(), - }; - return Ok(regions); - } - _ => (), - } - - // Returns all regions for not supported partition expr as a safety hatch. - Ok(partition_rule - .find_regions(&[])? - .into_iter() - .collect::>()) - } - - async fn find_datanodes( - &self, - regions: Vec, - ) -> Result>> { - let route = self.table_routes.get_route(&self.table_name).await?; - - let mut datanodes = HashMap::new(); - for region in regions.iter() { - let datanode = route - .region_routes - .iter() - .find_map(|x| { - if x.region.id == *region as u64 { - x.leader_peer.clone() - } else { - None - } - }) - .context(error::FindDatanodeSnafu { region: *region })?; - datanodes - .entry(datanode) - .or_insert_with(Vec::new) - .push(*region); - } - Ok(datanodes) - } - - async fn find_partition_rule(&self) -> Result> { - let route = self.table_routes.get_route(&self.table_name).await?; - ensure!( - !route.region_routes.is_empty(), - error::FindRegionRoutesSnafu { - table_name: self.table_name.to_string() - } - ); - - let mut partitions = Vec::with_capacity(route.region_routes.len()); - for r in route.region_routes.iter() { - let partition = - r.region - .partition - .clone() - .context(error::FindRegionPartitionSnafu { - region: r.region.id, - table_name: self.table_name.to_string(), - })?; - let partition_def: PartitionDef = partition.try_into()?; - partitions.push((r.region.id, partition_def)); - } - partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds())); - - ensure!( - partitions - .windows(2) - .all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()), - error::IllegalTableRoutesDataSnafu { - table_name: self.table_name.to_string(), - err_msg: "partition columns of all regions are not the same" - } - ); - let partition_columns = partitions[0].1.partition_columns(); - ensure!( - !partition_columns.is_empty(), - error::IllegalTableRoutesDataSnafu { - table_name: self.table_name.to_string(), - err_msg: "no partition columns found" - } - ); - - let regions = partitions - .iter() - .map(|x| x.0 as u32) - .collect::>(); - - // TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way. - let partition_rule: PartitionRuleRef = match partition_columns.len() { - 1 => { - // Omit the last "MAXVALUE". - let bounds = partitions - .iter() - .filter_map(|(_, p)| match &p.partition_bounds()[0] { - PartitionBound::Value(v) => Some(v.clone()), - PartitionBound::MaxValue => None, - }) - .collect::>(); - Arc::new(RangePartitionRule::new( - partition_columns[0].clone(), - bounds, - regions, - )) as _ - } - _ => { - let bounds = partitions - .iter() - .map(|x| x.1.partition_bounds().clone()) - .collect::>>(); - Arc::new(RangeColumnsPartitionRule::new( - partition_columns.clone(), - bounds, - regions, - )) as _ - } - }; - Ok(partition_rule) - } - pub(crate) async fn table_global_value( &self, key: &TableGlobalKey, @@ -394,9 +178,9 @@ impl DistTable { .backend .get(key.to_string().as_bytes()) .await - .context(CatalogSnafu)?; + .context(error::CatalogSnafu)?; Ok(if let Some(raw) = raw { - Some(TableGlobalValue::from_bytes(raw.1).context(CatalogEntrySerdeSnafu)?) + Some(TableGlobalValue::from_bytes(raw.1).context(error::CatalogEntrySerdeSnafu)?) } else { None }) @@ -407,17 +191,17 @@ impl DistTable { key: TableGlobalKey, value: TableGlobalValue, ) -> Result<()> { - let value = value.as_bytes().context(CatalogEntrySerdeSnafu)?; + let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?; self.backend .set(key.to_string().as_bytes(), &value) .await - .context(CatalogSnafu) + .context(error::CatalogSnafu) } async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { let alter_expr = context .get::() - .context(ContextValueNotFoundSnafu { key: "AlterExpr" })?; + .context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?; self.alter_by_expr(alter_expr).await?; @@ -426,9 +210,9 @@ impl DistTable { let new_meta = table_info .meta .builder_with_alter_kind(table_name, &request.alter_kind) - .context(TableSnafu)? + .context(error::TableSnafu)? .build() - .context(BuildTableMetaSnafu { + .context(error::BuildTableMetaSnafu { table_name: table_name.clone(), })?; @@ -441,12 +225,12 @@ impl DistTable { schema_name: alter_expr.schema_name.clone(), table_name: alter_expr.table_name.clone(), }; - let mut value = self - .table_global_value(&key) - .await? - .context(TableNotFoundSnafu { - table_name: alter_expr.table_name.clone(), - })?; + let mut value = + self.table_global_value(&key) + .await? + .context(error::TableNotFoundSnafu { + table_name: alter_expr.table_name.clone(), + })?; value.table_info = new_info.into(); @@ -456,11 +240,17 @@ impl DistTable { /// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between /// [`table::requests::AlterTableRequest`] and [`AlterExpr`]. async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> { - let table_routes = self.table_routes.get_route(&self.table_name).await?; + let table_routes = self + .partition_manager + .find_table_route(&self.table_name) + .await + .with_context(|_| error::FindTableRouteSnafu { + table_name: self.table_name.to_string(), + })?; let leaders = table_routes.find_leaders(); ensure!( !leaders.is_empty(), - LeaderNotFoundSnafu { + error::LeaderNotFoundSnafu { table: format!( "{:?}.{:?}.{}", expr.catalog_name, expr.schema_name, expr.table_name @@ -473,7 +263,10 @@ impl DistTable { self.datanode_clients.get_client(&datanode).await, ); debug!("Sending {:?} to {:?}", expr, db); - let result = db.alter(expr.clone()).await.context(RequestDatanodeSnafu)?; + let result = db + .alter(expr.clone()) + .await + .context(error::RequestDatanodeSnafu)?; debug!("Alter table result: {:?}", result); // TODO(hl): We should further check and track alter result in some global DDL task tracker } @@ -494,28 +287,6 @@ fn project_schema(table_schema: SchemaRef, projection: Option<&Vec>) -> S } } -fn is_compare_op(op: &Operator) -> bool { - matches!( - *op, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - ) -} - -fn reverse_operator(op: &Operator) -> Operator { - match *op { - Operator::Lt => Operator::Gt, - Operator::Gt => Operator::Lt, - Operator::LtEq => Operator::GtEq, - Operator::GtEq => Operator::LtEq, - _ => *op, - } -} - #[derive(Debug)] struct DistTableScan { schema: SchemaRef, @@ -604,6 +375,8 @@ impl PartitionExec { #[cfg(test)] mod test { + use std::collections::HashMap; + use api::v1::column::SemanticType; use api::v1::{column, Column, ColumnDataType, InsertRequest}; use catalog::error::Result; @@ -617,7 +390,7 @@ mod test { use datafusion::prelude::SessionContext; use datafusion::sql::sqlparser; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; - use datafusion_expr::lit; + use datafusion_expr::{lit, Operator}; use datanode::instance::Instance; use datatypes::arrow::compute::SortOptions; use datatypes::prelude::ConcreteDataType; @@ -626,14 +399,20 @@ mod test { use meta_client::client::MetaClient; use meta_client::rpc::router::RegionRoute; use meta_client::rpc::{Region, Table, TableRoute}; + use partition::columns::RangeColumnsPartitionRule; + use partition::manager::PartitionRuleManager; + use partition::partition::{PartitionBound, PartitionDef}; + use partition::range::RangePartitionRule; + use partition::route::TableRoutes; + use partition::PartitionRuleRef; use sql::parser::ParserContext; use sql::statements::statement::Statement; + use store_api::storage::RegionNumber; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::TableRef; use super::*; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; - use crate::partitioning::range::RangePartitionRule; struct DummyKvBackend; @@ -667,33 +446,8 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_find_partition_rule() { let table_name = TableName::new("greptime", "public", "foo"); - - let column_schemas = vec![ - ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false), - ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), - ColumnSchema::new("b", ConcreteDataType::string_datatype(), true), - ]; - let schema = Arc::new(Schema::new(column_schemas.clone())); - let meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![]) - .next_column_id(1) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .name(&table_name.table_name) - .meta(meta) - .build() - .unwrap(); - let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))); - let table = DistTable { - table_name: table_name.clone(), - table_info: Arc::new(table_info), - table_routes: table_routes.clone(), - datanode_clients: Arc::new(DatanodeClients::new()), - backend: Arc::new(DummyKvBackend), - }; + let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone())); let table_route = TableRoute { table: Table { @@ -759,7 +513,10 @@ mod test { .insert_table_route(table_name.clone(), Arc::new(table_route)) .await; - let partition_rule = table.find_partition_rule().await.unwrap(); + let partition_rule = partition_manager + .find_table_partition_rule(&table_name) + .await + .unwrap(); let range_rule = partition_rule .as_any() .downcast_ref::() @@ -838,7 +595,10 @@ mod test { .insert_table_route(table_name.clone(), Arc::new(table_route)) .await; - let partition_rule = table.find_partition_rule().await.unwrap(); + let partition_rule = partition_manager + .find_table_partition_rule(&table_name) + .await + .unwrap(); let range_columns_rule = partition_rule .as_any() .downcast_ref::() @@ -1035,7 +795,7 @@ mod test { let datanode_instances = instance.datanodes; let catalog_manager = dist_instance.catalog_manager(); - let table_routes = catalog_manager.table_routes(); + let partition_manager = catalog_manager.partition_manager(); let datanode_clients = catalog_manager.datanode_clients(); let table_name = TableName::new("greptime", "public", "dist_numbers"); @@ -1074,7 +834,10 @@ mod test { .await .unwrap(); - let table_route = table_routes.get_route(&table_name).await.unwrap(); + let table_route = partition_manager + .find_table_route(&table_name) + .await + .unwrap(); let mut region_to_datanode_mapping = HashMap::new(); for region_route in table_route.region_routes.iter() { @@ -1114,7 +877,7 @@ mod test { DistTable { table_name, table_info: Arc::new(table_info), - table_routes, + partition_manager, datanode_clients, backend: catalog_manager.backend(), } @@ -1169,30 +932,9 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_find_regions() { - let schema = Arc::new(Schema::new(vec![ColumnSchema::new( - "a", - ConcreteDataType::int32_datatype(), - true, - )])); - let table_name = TableName::new("greptime", "public", "foo"); - let meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(vec![]) - .next_column_id(1) - .build() - .unwrap(); - let table_info = TableInfoBuilder::default() - .name(&table_name.table_name) - .meta(meta) - .build() - .unwrap(); - let table = DistTable { - table_name, - table_info: Arc::new(table_info), - table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))), - datanode_clients: Arc::new(DatanodeClients::new()), - backend: Arc::new(DummyKvBackend), - }; + let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new( + Arc::new(MetaClient::default()), + )))); // PARTITION BY RANGE (a) ( // PARTITION r1 VALUES LESS THAN (10), @@ -1200,18 +942,18 @@ mod test { // PARTITION r3 VALUES LESS THAN (50), // PARTITION r4 VALUES LESS THAN (MAXVALUE), // ) - let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( + let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new( "a", vec![10_i32.into(), 20_i32.into(), 50_i32.into()], vec![0_u32, 1, 2, 3], )) as _; + let partition_rule_clone = partition_rule.clone(); let test = |filters: Vec, expect_regions: Vec| { - let mut regions = table - .find_regions(partition_rule.clone(), filters.as_slice()) + let mut regions = partition_manager + .find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice()) .unwrap(); regions.sort(); - assert_eq!(regions, expect_regions); }; @@ -1298,7 +1040,7 @@ mod test { ); // test failed to find regions by contradictory filters - let regions = table.find_regions( + let regions = partition_manager.find_regions_by_filters( partition_rule, vec![and( binary_expr(col("a"), Operator::Lt, lit(20)), @@ -1309,7 +1051,7 @@ mod test { ); // a < 20 AND a >= 20 assert!(matches!( regions.unwrap_err(), - error::Error::FindRegions { .. } + partition::error::Error::FindRegions { .. } )); } } diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 3996edb796..011bf5557e 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -27,7 +27,7 @@ use table::requests::InsertRequest; use super::DistTable; use crate::error; -use crate::error::Result; +use crate::error::{FindTableRouteSnafu, Result}; use crate::table::scan::DatanodeInstance; impl DistTable { @@ -35,8 +35,13 @@ impl DistTable { &self, inserts: HashMap, ) -> Result { - let route = self.table_routes.get_route(&self.table_name).await?; - + let route = self + .partition_manager + .find_table_route(&self.table_name) + .await + .with_context(|_| FindTableRouteSnafu { + table_name: self.table_name.to_string(), + })?; let mut joins = Vec::with_capacity(inserts.len()); for (region_id, insert) in inserts { let datanode = route diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 3249000605..e2d5e116c9 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -28,6 +28,8 @@ use meta_srv::metasrv::MetaSrvOptions; use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; use meta_srv::service::store::memory::MemStore; +use partition::manager::PartitionRuleManager; +use partition::route::TableRoutes; use servers::grpc::GrpcServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::Mode; @@ -39,7 +41,6 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::instance::distributed::DistInstance; use crate::instance::Instance; -use crate::table::route::TableRoutes; /// Guard against the `TempDir`s that used in unit tests. /// (The `TempDir` will be deleted once it goes out of scope.) @@ -241,10 +242,12 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu let meta_backend = Arc::new(MetaKvBackend { client: meta_client.clone(), }); - let table_routes = Arc::new(TableRoutes::new(meta_client.clone())); + let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new( + meta_client.clone(), + )))); let catalog_manager = Arc::new(FrontendCatalogManager::new( meta_backend, - table_routes.clone(), + partition_manager, datanode_clients.clone(), )); diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml new file mode 100644 index 0000000000..e0f7e8866d --- /dev/null +++ b/src/partition/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "partition" +version.workspace = true +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +common-error = { path = "../common/error" } +common-query = { path = "../common/query" } +datafusion.workspace = true +datafusion-common.workspace = true +datafusion-expr.workspace = true +datatypes = { path = "../datatypes" } +meta-client = { path = "../meta-client" } +moka = { version = "0.9", features = ["future"] } +snafu.workspace = true +store-api = { path = "../store-api" } +serde.workspace = true +serde_json = "1.0" +table = { path = "../table" } diff --git a/src/frontend/src/partitioning/columns.rs b/src/partition/src/columns.rs similarity index 97% rename from src/frontend/src/partitioning/columns.rs rename to src/partition/src/columns.rs index 6e8e739c13..4c5c13b33a 100644 --- a/src/frontend/src/partitioning/columns.rs +++ b/src/partition/src/columns.rs @@ -20,7 +20,7 @@ use snafu::ensure; use store_api::storage::RegionNumber; use crate::error::{self, Error}; -use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule}; +use crate::partition::{PartitionBound, PartitionExpr, PartitionRule}; /// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows /// partitioning by multiple columns. @@ -77,7 +77,7 @@ pub struct RangeColumnsPartitionRule { impl RangeColumnsPartitionRule { // It's assured that input arguments are valid because they are checked in SQL parsing stage. // So we can skip validating them. - pub(crate) fn new( + pub fn new( column_list: Vec, value_lists: Vec>, regions: Vec, @@ -123,25 +123,20 @@ impl RangeColumnsPartitionRule { } } - #[cfg(test)] - pub(crate) fn column_list(&self) -> &Vec { + pub fn column_list(&self) -> &Vec { &self.column_list } - #[cfg(test)] - pub(crate) fn value_lists(&self) -> &Vec> { + pub fn value_lists(&self) -> &Vec> { &self.value_lists } - #[cfg(test)] - pub(crate) fn regions(&self) -> &Vec { + pub fn regions(&self) -> &Vec { &self.regions } } impl PartitionRule for RangeColumnsPartitionRule { - type Error = Error; - fn as_any(&self) -> &dyn Any { self } @@ -150,7 +145,7 @@ impl PartitionRule for RangeColumnsPartitionRule { self.column_list.clone() } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { ensure!( values.len() == self.column_list.len(), error::RegionKeysSizeSnafu { @@ -171,7 +166,7 @@ impl PartitionRule for RangeColumnsPartitionRule { }) } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error> { let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) { let PartitionExpr { column: _, @@ -220,6 +215,7 @@ mod tests { use std::assert_matches::assert_matches; use super::*; + use crate::partition::{PartitionBound, PartitionExpr}; #[test] fn test_find_regions() { diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs new file mode 100644 index 0000000000..f3b4a8bdea --- /dev/null +++ b/src/partition/src/error.rs @@ -0,0 +1,155 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; +use common_query::prelude::Expr; +use datafusion_common::ScalarValue; +use snafu::Snafu; +use store_api::storage::RegionId; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Failed to get cache, error: {}", err_msg))] + GetCache { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to request Meta, source: {}", source))] + RequestMeta { + #[snafu(backtrace)] + source: meta_client::error::Error, + }, + + #[snafu(display("Failed to find Datanode, table: {} region: {:?}", table, region))] + FindDatanode { + table: String, + region: RegionId, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find table routes for table {}", table_name))] + FindTableRoutes { + table_name: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to find region routes for table {}, region id: {}", + table_name, + region_id + ))] + FindRegionRoutes { + table_name: String, + region_id: u64, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to serialize value to json, source: {}", source))] + SerializeJson { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to deserialize value from json, source: {}", source))] + DeserializeJson { + source: serde_json::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Expect {} region keys, actual {}", expect, actual))] + RegionKeysSize { + expect: usize, + actual: usize, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find region, reason: {}", reason))] + FindRegion { + reason: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find regions by filters: {:?}", filters))] + FindRegions { + filters: Vec, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find partition column: {}", column_name))] + FindPartitionColumn { + column_name: String, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid InsertRequest, reason: {}", reason))] + InvalidInsertRequest { + reason: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Invalid table route data in meta, table name: {}, msg: {}", + table_name, + err_msg + ))] + InvalidTableRouteData { + table_name: String, + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert DataFusion's ScalarValue: {:?}, source: {}", + value, + source + ))] + ConvertScalarValue { + value: ScalarValue, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::GetCache { .. } => StatusCode::StorageUnavailable, + Error::FindRegionRoutes { .. } => StatusCode::InvalidArguments, + Error::FindTableRoutes { .. } => StatusCode::InvalidArguments, + Error::RequestMeta { source, .. } => source.status_code(), + Error::FindRegion { .. } + | Error::FindRegions { .. } + | Error::RegionKeysSize { .. } + | Error::InvalidInsertRequest { .. } + | Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments, + Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, + Error::InvalidTableRouteData { .. } => StatusCode::Internal, + Error::ConvertScalarValue { .. } => StatusCode::Internal, + Error::FindDatanode { .. } => StatusCode::InvalidArguments, + } + } + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs new file mode 100644 index 0000000000..731abbd45e --- /dev/null +++ b/src/partition/src/lib.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(assert_matches)] + +pub mod columns; +pub mod error; +pub mod manager; +pub mod partition; +pub mod range; +pub mod route; +pub mod splitter; + +pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs new file mode 100644 index 0000000000..7c7e3e3472 --- /dev/null +++ b/src/partition/src/manager.rs @@ -0,0 +1,282 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use common_query::prelude::Expr; +use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; +use datatypes::prelude::Value; +use meta_client::rpc::{Peer, TableName, TableRoute}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::RegionNumber; +use table::requests::InsertRequest; + +use crate::columns::RangeColumnsPartitionRule; +use crate::error::Result; +use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; +use crate::range::RangePartitionRule; +use crate::route::TableRoutes; +use crate::splitter::{InsertRequestSplit, WriteSplitter}; +use crate::{error, PartitionRuleRef}; + +pub type PartitionRuleManagerRef = Arc; + +/// PartitionRuleManager manages the table routes and partition rules. +/// It provides methods to find regions by: +/// - values (in case of insertion) +/// - filters (in case of select, deletion and update) +pub struct PartitionRuleManager { + table_routes: Arc, +} + +impl PartitionRuleManager { + pub fn new(table_routes: Arc) -> Self { + Self { table_routes } + } + + /// Find table route of given table name. + pub async fn find_table_route(&self, table: &TableName) -> Result> { + self.table_routes.get_route(table).await + } + + /// Find datanodes of corresponding regions of given table. + pub async fn find_region_datanodes( + &self, + table: &TableName, + regions: Vec, + ) -> Result>> { + let route = self.table_routes.get_route(table).await?; + let mut datanodes = HashMap::with_capacity(regions.len()); + for region in regions.iter() { + let datanode = route + .region_routes + .iter() + .find_map(|x| { + if x.region.id == *region as u64 { + x.leader_peer.clone() + } else { + None + } + }) + .context(error::FindDatanodeSnafu { + table: table.to_string(), + region: *region, + })?; + datanodes + .entry(datanode) + .or_insert_with(Vec::new) + .push(*region); + } + Ok(datanodes) + } + + /// Get partition rule of given table. + pub async fn find_table_partition_rule(&self, table: &TableName) -> Result { + let route = self.table_routes.get_route(table).await?; + ensure!( + !route.region_routes.is_empty(), + error::FindTableRoutesSnafu { + table_name: table.to_string() + } + ); + + let mut partitions = Vec::with_capacity(route.region_routes.len()); + for r in route.region_routes.iter() { + let partition = r + .region + .partition + .clone() + .context(error::FindRegionRoutesSnafu { + region_id: r.region.id, + table_name: table.to_string(), + })?; + let partition_def = PartitionDef::try_from(partition)?; + partitions.push((r.region.id, partition_def)); + } + partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds())); + + ensure!( + partitions + .windows(2) + .all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()), + error::InvalidTableRouteDataSnafu { + table_name: table.to_string(), + err_msg: "partition columns of all regions are not the same" + } + ); + let partition_columns = partitions[0].1.partition_columns(); + ensure!( + !partition_columns.is_empty(), + error::InvalidTableRouteDataSnafu { + table_name: table.to_string(), + err_msg: "no partition columns found" + } + ); + + let regions = partitions + .iter() + .map(|x| x.0 as u32) + .collect::>(); + + // TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way. + let partition_rule: PartitionRuleRef = match partition_columns.len() { + 1 => { + // Omit the last "MAXVALUE". + let bounds = partitions + .iter() + .filter_map(|(_, p)| match &p.partition_bounds()[0] { + PartitionBound::Value(v) => Some(v.clone()), + PartitionBound::MaxValue => None, + }) + .collect::>(); + Arc::new(RangePartitionRule::new( + partition_columns[0].clone(), + bounds, + regions, + )) as _ + } + _ => { + let bounds = partitions + .iter() + .map(|x| x.1.partition_bounds().clone()) + .collect::>>(); + Arc::new(RangeColumnsPartitionRule::new( + partition_columns.clone(), + bounds, + regions, + )) as _ + } + }; + Ok(partition_rule) + } + + /// Find regions in partition rule by filters. + pub fn find_regions_by_filters( + &self, + partition_rule: PartitionRuleRef, + filters: &[Expr], + ) -> Result> { + let regions = if let Some((first, rest)) = filters.split_first() { + let mut target = find_regions0(partition_rule.clone(), first)?; + for filter in rest { + let regions = find_regions0(partition_rule.clone(), filter)?; + + // When all filters are provided as a collection, it often implicitly states that + // "all filters must be satisfied". So we join all the results here. + target.retain(|x| regions.contains(x)); + + // Failed fast, empty collection join any is empty. + if target.is_empty() { + break; + } + } + target.into_iter().collect::>() + } else { + partition_rule.find_regions(&[])? + }; + ensure!( + !regions.is_empty(), + error::FindRegionsSnafu { + filters: filters.to_vec() + } + ); + Ok(regions) + } + + /// Split [InsertRequest] into [InsertRequestSplit] according to the partition rule + /// of given table. + pub async fn split_insert_request( + &self, + table: &TableName, + req: InsertRequest, + ) -> Result { + let partition_rule = self.find_table_partition_rule(table).await.unwrap(); + let splitter = WriteSplitter::with_partition_rule(partition_rule); + splitter.split_insert(req) + } +} + +fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { + let expr = filter.df_expr(); + match expr { + DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => { + let column_op_value = match (left.as_ref(), right.as_ref()) { + (DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)), + (DfExpr::Literal(v), DfExpr::Column(c)) => Some((&c.name, reverse_operator(op), v)), + _ => None, + }; + if let Some((column, op, scalar)) = column_op_value { + let value = Value::try_from(scalar.clone()).with_context(|_| { + error::ConvertScalarValueSnafu { + value: scalar.clone(), + } + })?; + return Ok(partition_rule + .find_regions(&[PartitionExpr::new(column, op, value)])? + .into_iter() + .collect::>()); + } + } + DfExpr::BinaryExpr(BinaryExpr { left, op, right }) + if matches!(op, Operator::And | Operator::Or) => + { + let left_regions = find_regions0(partition_rule.clone(), &(*left.clone()).into())?; + let right_regions = find_regions0(partition_rule.clone(), &(*right.clone()).into())?; + let regions = match op { + Operator::And => left_regions + .intersection(&right_regions) + .cloned() + .collect::>(), + Operator::Or => left_regions + .union(&right_regions) + .cloned() + .collect::>(), + _ => unreachable!(), + }; + return Ok(regions); + } + _ => (), + } + + // Returns all regions for not supported partition expr as a safety hatch. + Ok(partition_rule + .find_regions(&[])? + .into_iter() + .collect::>()) +} + +#[inline] +fn is_compare_op(op: &Operator) -> bool { + matches!( + *op, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + ) +} + +#[inline] +fn reverse_operator(op: &Operator) -> Operator { + match *op { + Operator::Lt => Operator::Gt, + Operator::Gt => Operator::Lt, + Operator::LtEq => Operator::GtEq, + Operator::GtEq => Operator::LtEq, + _ => *op, + } +} diff --git a/src/frontend/src/partitioning.rs b/src/partition/src/partition.rs similarity index 88% rename from src/frontend/src/partitioning.rs rename to src/partition/src/partition.rs index 3f47cc4261..9bd16c548a 100644 --- a/src/frontend/src/partitioning.rs +++ b/src/partition/src/partition.rs @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod columns; -pub(crate) mod range; - use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -pub use datafusion_expr::Operator; +use datafusion_expr::Operator; use datatypes::prelude::Value; use meta_client::rpc::Partition as MetaPartition; use serde::{Deserialize, Serialize}; @@ -28,51 +25,46 @@ use store_api::storage::RegionNumber; use crate::error::{self, Error}; -pub(crate) type PartitionRuleRef = Arc>; +pub type PartitionRuleRef = Arc; pub trait PartitionRule: Sync + Send { - type Error: Debug; - fn as_any(&self) -> &dyn Any; fn partition_columns(&self) -> Vec; // TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop. // Or find better names since one is mainly for writes and the other is for reads. - fn find_region(&self, values: &[Value]) -> Result; + fn find_region(&self, values: &[Value]) -> Result; - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error>; + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error>; } /// The right bound(exclusive) of partition range. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub(crate) enum PartitionBound { +pub enum PartitionBound { Value(Value), MaxValue, } #[derive(Debug)] -pub(crate) struct PartitionDef { +pub struct PartitionDef { partition_columns: Vec, partition_bounds: Vec, } impl PartitionDef { - pub(crate) fn new( - partition_columns: Vec, - partition_bounds: Vec, - ) -> Self { + pub fn new(partition_columns: Vec, partition_bounds: Vec) -> Self { Self { partition_columns, partition_bounds, } } - pub(crate) fn partition_columns(&self) -> &Vec { + pub fn partition_columns(&self) -> &Vec { &self.partition_columns } - pub(crate) fn partition_bounds(&self) -> &Vec { + pub fn partition_bounds(&self) -> &Vec { &self.partition_bounds } } @@ -133,13 +125,13 @@ impl TryFrom for MetaPartition { #[derive(Debug, PartialEq, Eq)] pub struct PartitionExpr { - column: String, - op: Operator, - value: Value, + pub column: String, + pub op: Operator, + pub value: Value, } impl PartitionExpr { - pub(crate) fn new(column: impl Into, op: Operator, value: Value) -> Self { + pub fn new(column: impl Into, op: Operator, value: Value) -> Self { Self { column: column.into(), op, diff --git a/src/frontend/src/partitioning/range.rs b/src/partition/src/range.rs similarity index 96% rename from src/frontend/src/partitioning/range.rs rename to src/partition/src/range.rs index a608a51542..23df5b296d 100644 --- a/src/frontend/src/partitioning/range.rs +++ b/src/partition/src/range.rs @@ -14,13 +14,14 @@ use std::any::Any; +use datafusion_expr::Operator; use datatypes::prelude::*; use serde::{Deserialize, Serialize}; use snafu::OptionExt; use store_api::storage::RegionNumber; use crate::error::{self, Error}; -use crate::partitioning::{Operator, PartitionExpr, PartitionRule}; +use crate::partition::{PartitionExpr, PartitionRule}; /// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value /// range. It's generated from create table request, using MySQL's syntax: @@ -70,7 +71,7 @@ pub struct RangePartitionRule { } impl RangePartitionRule { - pub(crate) fn new( + pub fn new( column_name: impl Into, bounds: Vec, regions: Vec, @@ -82,23 +83,20 @@ impl RangePartitionRule { } } - pub(crate) fn column_name(&self) -> &String { + pub fn column_name(&self) -> &String { &self.column_name } - pub(crate) fn all_regions(&self) -> &Vec { + pub fn all_regions(&self) -> &Vec { &self.regions } - #[cfg(test)] - pub(crate) fn bounds(&self) -> &Vec { + pub fn bounds(&self) -> &Vec { &self.bounds } } impl PartitionRule for RangePartitionRule { - type Error = Error; - fn as_any(&self) -> &dyn Any { self } @@ -107,7 +105,7 @@ impl PartitionRule for RangePartitionRule { vec![self.column_name().to_string()] } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { debug_assert_eq!( values.len(), 1, @@ -122,7 +120,7 @@ impl PartitionRule for RangePartitionRule { }) } - fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Error> { if exprs.is_empty() { return Ok(self.regions.clone()); } @@ -173,7 +171,10 @@ impl PartitionRule for RangePartitionRule { #[cfg(test)] mod test { + use datafusion_expr::Operator; + use super::*; + use crate::partition::PartitionExpr; #[test] fn test_find_regions() { diff --git a/src/frontend/src/table/route.rs b/src/partition/src/route.rs similarity index 86% rename from src/frontend/src/table/route.rs rename to src/partition/src/route.rs index 68f0c7225d..c4f6008663 100644 --- a/src/frontend/src/table/route.rs +++ b/src/partition/src/route.rs @@ -22,13 +22,14 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, Result}; -pub(crate) struct TableRoutes { +pub struct TableRoutes { meta_client: Arc, cache: Cache>, } +// TODO(hl): maybe periodically refresh table route cache? impl TableRoutes { - pub(crate) fn new(meta_client: Arc) -> Self { + pub fn new(meta_client: Arc) -> Self { Self { meta_client, cache: CacheBuilder::new(1024) @@ -38,7 +39,7 @@ impl TableRoutes { } } - pub(crate) async fn get_route(&self, table_name: &TableName) -> Result> { + pub async fn get_route(&self, table_name: &TableName) -> Result> { self.cache .try_get_with_by_ref(table_name, self.get_from_meta(table_name)) .await @@ -68,12 +69,7 @@ impl TableRoutes { Ok(Arc::new(route)) } - #[cfg(test)] - pub(crate) async fn insert_table_route( - &self, - table_name: TableName, - table_route: Arc, - ) { + pub async fn insert_table_route(&self, table_name: TableName, table_route: Arc) { self.cache.insert(table_name, table_route).await } } diff --git a/src/frontend/src/spliter.rs b/src/partition/src/splitter.rs similarity index 90% rename from src/frontend/src/spliter.rs rename to src/partition/src/splitter.rs index 5c64722497..0fcccb3f23 100644 --- a/src/frontend/src/spliter.rs +++ b/src/partition/src/splitter.rs @@ -20,34 +20,33 @@ use datatypes::value::Value; use datatypes::vectors::VectorRef; use snafu::{ensure, OptionExt}; use store_api::storage::RegionNumber; -use table::requests::InsertRequest; +use table::requests::{DeleteRequest, InsertRequest}; -use crate::error::{ - Error, FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result, -}; -use crate::partitioning::PartitionRuleRef; +use crate::error::{FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result}; +use crate::PartitionRuleRef; -pub type DistInsertRequest = HashMap; +pub type InsertRequestSplit = HashMap; +pub type DeleteRequestSplit = HashMap; -pub struct WriteSpliter { - partition_rule: PartitionRuleRef, +pub struct WriteSplitter { + partition_rule: PartitionRuleRef, } -impl WriteSpliter { - pub fn with_partition_rule(rule: PartitionRuleRef) -> Self { +impl WriteSplitter { + pub fn with_partition_rule(rule: PartitionRuleRef) -> Self { Self { partition_rule: rule, } } - pub fn split(&self, insert: InsertRequest) -> Result { + pub fn split_insert(&self, insert: InsertRequest) -> Result { check_req(&insert)?; let column_names = self.partition_rule.partition_columns(); let partition_columns = find_partitioning_values(&insert, &column_names)?; let region_map = self.split_partitioning_values(&partition_columns)?; - Ok(partition_insert_request(&insert, region_map)) + Ok(split_insert_request(&insert, region_map)) } fn split_partitioning_values( @@ -80,10 +79,6 @@ impl WriteSpliter { } fn check_req(insert: &InsertRequest) -> Result<()> { - check_vectors_len(insert) -} - -fn check_vectors_len(insert: &InsertRequest) -> Result<()> { let mut len: Option = None; for vector in insert.columns_values.values() { match len { @@ -123,10 +118,10 @@ fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec { .collect() } -fn partition_insert_request( +fn split_insert_request( insert: &InsertRequest, region_map: HashMap>, -) -> DistInsertRequest { +) -> InsertRequestSplit { let mut dist_insert: HashMap>> = HashMap::with_capacity(region_map.len()); @@ -185,7 +180,6 @@ mod tests { use std::result::Result; use std::sync::Arc; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::ScalarVectorBuilder; use datatypes::types::StringType; @@ -198,11 +192,11 @@ mod tests { use table::requests::InsertRequest; use super::{ - check_req, find_partitioning_values, partition_insert_request, partition_values, - WriteSpliter, + check_req, find_partitioning_values, partition_values, split_insert_request, WriteSplitter, }; use crate::error::Error; - use crate::partitioning::{PartitionExpr, PartitionRule, PartitionRuleRef}; + use crate::partition::{PartitionExpr, PartitionRule}; + use crate::PartitionRuleRef; #[test] fn test_insert_req_check() { @@ -218,9 +212,9 @@ mod tests { #[test] fn test_writer_spliter() { let insert = mock_insert_request(); - let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; - let spliter = WriteSpliter::with_partition_rule(rule); - let ret = spliter.split(insert).unwrap(); + let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let spliter = WriteSplitter::with_partition_rule(rule); + let ret = spliter.split_insert(insert).unwrap(); assert_eq!(2, ret.len()); @@ -277,7 +271,7 @@ mod tests { region_map.insert(1, vec![2, 0]); region_map.insert(2, vec![1]); - let dist_insert = partition_insert_request(&insert, region_map); + let dist_insert = split_insert_request(&insert, region_map); let r1_insert = dist_insert.get(&1_u32).unwrap(); assert_eq!("demo", r1_insert.table_name); @@ -402,8 +396,8 @@ mod tests { columns_values.insert("id".to_string(), builder.to_vector()); InsertRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: "demo".to_string(), columns_values, } @@ -429,8 +423,8 @@ mod tests { columns_values.insert("id".to_string(), builder.to_vector()); InsertRequest { - catalog_name: DEFAULT_CATALOG_NAME.to_string(), - schema_name: DEFAULT_SCHEMA_NAME.to_string(), + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: "demo".to_string(), columns_values, } @@ -444,8 +438,6 @@ mod tests { // PARTITION r1 VALUES IN(2, 3), // ); impl PartitionRule for MockPartitionRule { - type Error = Error; - fn as_any(&self) -> &dyn Any { self } @@ -454,7 +446,7 @@ mod tests { vec!["id".to_string()] } - fn find_region(&self, values: &[Value]) -> Result { + fn find_region(&self, values: &[Value]) -> Result { let val = values.get(0).unwrap().to_owned(); let id_1: Value = 1_i16.into(); let id_2: Value = 2_i16.into();