refactor: move partition related code to partition manager (#906)

* wip: fix compile errors

* chore: move splitter to partition crate

* fix: remove useless variants in frontend errors

* chore: move more partition related code to partition manager

* fix: license header

* wip: move WriteSplitter to PartitionRuleManager

* fix: clippy warnings

* chore: remove useless error variant and format toml

* fix: cr comments

* chore: resolve conflicts

* chore: rebase develop

* fix: cr comments
This commit is contained in:
Lei, HUANG
2023-02-01 19:24:49 +08:00
committed by GitHub
parent 9a30ba00c4
commit 028a69e349
20 changed files with 709 additions and 556 deletions

20
Cargo.lock generated
View File

@@ -2687,6 +2687,7 @@ dependencies = [
"meta-srv",
"moka",
"openmetrics-parser",
"partition",
"prost 0.11.6",
"query",
"rustls",
@@ -4715,6 +4716,25 @@ dependencies = [
"regex",
]
[[package]]
name = "partition"
version = "0.1.0"
dependencies = [
"common-error",
"common-query",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"meta-client",
"moka",
"serde",
"serde_json",
"snafu",
"store-api",
"table",
]
[[package]]
name = "paste"
version = "1.0.11"

View File

@@ -27,6 +27,7 @@ members = [
"src/meta-srv",
"src/mito",
"src/object-store",
"src/partition",
"src/promql",
"src/query",
"src/script",

View File

@@ -32,6 +32,7 @@ itertools = "0.10"
meta-client = { path = "../meta-client" }
moka = { version = "0.9", features = ["future"] }
openmetrics-parser = "0.4"
partition = { path = "../partition" }
prost.workspace = true
query = { path = "../query" }
rustls = "0.20"

View File

@@ -29,29 +29,29 @@ use catalog::{
};
use futures::StreamExt;
use meta_client::rpc::TableName;
use partition::manager::PartitionRuleManagerRef;
use snafu::prelude::*;
use table::TableRef;
use crate::datanode::DatanodeClients;
use crate::table::route::TableRoutes;
use crate::table::DistTable;
#[derive(Clone)]
pub struct FrontendCatalogManager {
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
}
impl FrontendCatalogManager {
pub(crate) fn new(
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
backend,
table_routes,
partition_manager,
datanode_clients,
}
}
@@ -61,8 +61,8 @@ impl FrontendCatalogManager {
}
#[cfg(test)]
pub(crate) fn table_routes(&self) -> Arc<TableRoutes> {
self.table_routes.clone()
pub(crate) fn partition_manager(&self) -> PartitionRuleManagerRef {
self.partition_manager.clone()
}
#[cfg(test)]
@@ -173,7 +173,7 @@ impl CatalogList for FrontendCatalogManager {
Ok(Some(Arc::new(FrontendCatalogProvider {
catalog_name: name.to_string(),
backend: self.backend.clone(),
table_routes: self.table_routes.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
@@ -185,7 +185,7 @@ impl CatalogList for FrontendCatalogManager {
pub struct FrontendCatalogProvider {
catalog_name: String,
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
}
@@ -232,7 +232,7 @@ impl CatalogProvider for FrontendCatalogProvider {
catalog_name: self.catalog_name.clone(),
schema_name: name.to_string(),
backend: self.backend.clone(),
table_routes: self.table_routes.clone(),
partition_manager: self.partition_manager.clone(),
datanode_clients: self.datanode_clients.clone(),
})))
} else {
@@ -245,7 +245,7 @@ pub struct FrontendSchemaProvider {
catalog_name: String,
schema_name: String,
backend: KvBackendRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
}
@@ -286,7 +286,7 @@ impl SchemaProvider for FrontendSchemaProvider {
};
let backend = self.backend.clone();
let table_routes = self.table_routes.clone();
let partition_manager = self.partition_manager.clone();
let datanode_clients = self.datanode_clients.clone();
let table_name = TableName::new(&self.catalog_name, &self.schema_name, name);
let result: Result<Option<TableRef>, catalog::error::Error> = std::thread::spawn(|| {
@@ -306,7 +306,7 @@ impl SchemaProvider for FrontendSchemaProvider {
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
),
table_routes,
partition_manager,
datanode_clients,
backend,
));

View File

@@ -15,9 +15,6 @@
use std::any::Any;
use common_error::prelude::*;
use common_query::logical_plan::Expr;
use datafusion_common::ScalarValue;
use datatypes::prelude::Value;
use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
@@ -99,35 +96,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display(
"Failed to convert DataFusion's ScalarValue: {:?}, source: {}",
value,
source
))]
ConvertScalarValue {
value: ScalarValue,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to find partition column: {}", column_name))]
FindPartitionColumn {
column_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find region, reason: {}", reason))]
FindRegion {
reason: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find regions by filters: {:?}", filters))]
FindRegions {
filters: Vec<Expr>,
backtrace: Backtrace,
},
#[snafu(display("Failed to find Datanode by region: {:?}", region))]
FindDatanode {
region: RegionId,
@@ -140,13 +108,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Expect {} region keys, actual {}", expect, actual))]
RegionKeysSize {
expect: usize,
actual: usize,
backtrace: Backtrace,
},
#[snafu(display("Table not found: {}", table_name))]
TableNotFound {
table_name: String,
@@ -201,16 +162,17 @@ pub enum Error {
source: meta_client::error::Error,
},
#[snafu(display("Failed to get cache, error: {}", err_msg))]
GetCache {
err_msg: String,
#[snafu(display("Failed to create table route for table {}", table_name))]
CreateTableRoute {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find table routes for table {}", table_name))]
FindTableRoutes {
#[snafu(display("Failed to find table route for table {}", table_name))]
FindTableRoute {
table_name: String,
backtrace: Backtrace,
#[snafu(backtrace)]
source: partition::error::Error,
},
#[snafu(display("Failed to create AlterExpr from Alter statement, source: {}", source))]
@@ -252,24 +214,12 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Failed to find region routes for table {}", table_name))]
FindRegionRoutes {
#[snafu(display("Failed to find region route for table {}", table_name))]
FindRegionRoute {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to serialize value to json, source: {}", source))]
SerializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize value from json, source: {}", source))]
DeserializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to find leader peer for region {} in table {}",
region,
@@ -281,28 +231,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display(
"Failed to find partition info for region {} in table {}",
region,
table_name
))]
FindRegionPartition {
region: u64,
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Illegal table routes data for table {}, error message: {}",
table_name,
err_msg
))]
IllegalTableRoutesData {
table_name: String,
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Cannot find primary key column by name: {}", msg))]
PrimaryKeyNotFound { msg: String, backtrace: Backtrace },
@@ -345,17 +273,6 @@ pub enum Error {
source: substrait::error::Error,
},
#[snafu(display(
"Failed to build a vector from values, value: {}, source: {}",
value,
source
))]
BuildVector {
value: Value,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Failed to found context value: {}", key))]
ContextValueNotFound { key: String, backtrace: Backtrace },
@@ -404,6 +321,15 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display(
"Failed to deserialize partition in meta to partition def, source: {}",
source
))]
DeserializePartition {
#[snafu(backtrace)]
source: partition::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -413,19 +339,15 @@ impl ErrorExt for Error {
match self {
Error::ParseAddr { .. }
| Error::InvalidSql { .. }
| Error::FindRegion { .. }
| Error::FindRegions { .. }
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. }
| Error::ColumnValuesNumberMismatch { .. }
| Error::RegionKeysSize { .. } => StatusCode::InvalidArguments,
| Error::ColumnValuesNumberMismatch { .. } => StatusCode::InvalidArguments,
Error::NotSupported { .. } => StatusCode::Unsupported,
Error::RuntimeResource { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ParseSql { source } | Error::AlterExprFromStmt { source } => {
source.status_code()
@@ -433,8 +355,7 @@ impl ErrorExt for Error {
Error::Table { source } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::ConvertScalarValue { source, .. } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(),
Error::RequestDatanode { source } => source.status_code(),
@@ -443,14 +364,9 @@ impl ErrorExt for Error {
}
Error::FindDatanode { .. }
| Error::GetCache { .. }
| Error::FindTableRoutes { .. }
| Error::SerializeJson { .. }
| Error::DeserializeJson { .. }
| Error::FindRegionRoutes { .. }
| Error::CreateTableRoute { .. }
| Error::FindRegionRoute { .. }
| Error::FindLeaderPeer { .. }
| Error::FindRegionPartition { .. }
| Error::IllegalTableRoutesData { .. }
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. } => StatusCode::Internal,
@@ -482,10 +398,12 @@ impl ErrorExt for Error {
Error::LeaderNotFound { .. } => StatusCode::StorageUnavailable,
Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Error::EncodeSubstraitLogicalPlan { source } => source.status_code(),
Error::BuildVector { source, .. } => source.status_code(),
Error::InvokeDatanode { source } => source.status_code(),
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
Error::DeserializePartition { source, .. } | Error::FindTableRoute { source, .. } => {
source.status_code()
}
}
}

View File

@@ -40,6 +40,8 @@ use datanode::instance::InstanceRef as DnInstanceRef;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
@@ -60,7 +62,6 @@ use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler};
use crate::table::route::TableRoutes;
use crate::Plugins;
#[async_trait]
@@ -104,10 +105,12 @@ impl Instance {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let datanode_clients = Arc::new(DatanodeClients::new());
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes,
partition_manager,
datanode_clients.clone(),
));

View File

@@ -34,6 +34,7 @@ use meta_client::rpc::{
CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse,
TableName, TableRoute,
};
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::sql::{describe_table, explain, show_databases, show_tables};
use query::{QueryEngineFactory, QueryEngineRef};
@@ -51,13 +52,12 @@ use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu,
ColumnDataTypeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu,
RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, TableNotFoundSnafu,
TableSnafu, ToTableInsertRequestSnafu,
ColumnDataTypeSnafu, DeserializePartitionSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu,
RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu,
TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu,
};
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::instance::parse_stmt;
use crate::partitioning::{PartitionBound, PartitionDef};
use crate::sql::insert_to_request;
#[derive(Clone)]
@@ -92,7 +92,7 @@ impl DistInstance {
let table_routes = response.table_routes;
ensure!(
table_routes.len() == 1,
error::FindTableRoutesSnafu {
error::CreateTableRouteSnafu {
table_name: create_table.table_name.to_string()
}
);
@@ -107,7 +107,7 @@ impl DistInstance {
let region_routes = &table_route.region_routes;
ensure!(
!region_routes.is_empty(),
error::FindRegionRoutesSnafu {
error::FindRegionRouteSnafu {
table_name: create_table.table_name.to_string()
}
);
@@ -509,8 +509,9 @@ fn parse_partitions(
partition_entries
.into_iter()
.map(|x| PartitionDef::new(partition_columns.clone(), x).try_into())
.collect::<Result<Vec<MetaPartition>>>()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)
}
fn find_partition_entries(

View File

@@ -26,11 +26,9 @@ pub mod influxdb;
pub mod instance;
pub mod mysql;
pub mod opentsdb;
pub mod partitioning;
pub mod postgres;
pub mod prometheus;
mod server;
pub mod spliter;
mod sql;
mod table;
#[cfg(test)]

View File

@@ -12,10 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod route;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::AlterExpr;
@@ -37,13 +34,10 @@ use datafusion::physical_plan::{
Partitioning, SendableRecordBatchStream as DfSendableRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::BinaryExpr;
use datatypes::prelude::Value;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
use meta_client::rpc::TableName;
use partition::manager::PartitionRuleManagerRef;
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef};
use table::requests::{AlterTableRequest, InsertRequest};
@@ -52,17 +46,7 @@ use table::Table;
use tokio::sync::RwLock;
use crate::datanode::DatanodeClients;
use crate::error::{
self, BuildTableMetaSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ContextValueNotFoundSnafu,
Error, LeaderNotFoundSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, TableSnafu,
};
use crate::partitioning::columns::RangeColumnsPartitionRule;
use crate::partitioning::range::RangePartitionRule;
use crate::partitioning::{
Operator, PartitionBound, PartitionDef, PartitionExpr, PartitionRuleRef,
};
use crate::spliter::WriteSpliter;
use crate::table::route::TableRoutes;
use crate::error::{self, Result};
use crate::table::scan::{DatanodeInstance, TableScanPlan};
pub mod insert;
@@ -72,7 +56,7 @@ pub(crate) mod scan;
pub struct DistTable {
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
backend: KvBackendRef,
}
@@ -92,20 +76,15 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
let partition_rule = self
.find_partition_rule()
let split = self
.partition_manager
.split_insert_request(&self.table_name, request)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let spliter = WriteSpliter::with_partition_rule(partition_rule);
let inserts = spliter
.split(request)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let output = self
.dist_insert(inserts)
.dist_insert(split)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
@@ -120,17 +99,20 @@ impl Table for DistTable {
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let partition_rule = self
.find_partition_rule()
.partition_manager
.find_table_partition_rule(&self.table_name)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let regions = self
.find_regions(partition_rule, filters)
.partition_manager
.find_regions_by_filters(partition_rule, filters)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = self
.find_datanodes(regions)
.partition_manager
.find_region_datanodes(&self.table_name, regions)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
@@ -175,217 +157,19 @@ impl DistTable {
pub(crate) fn new(
table_name: TableName,
table_info: TableInfoRef,
table_routes: Arc<TableRoutes>,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
backend: KvBackendRef,
) -> Self {
Self {
table_name,
table_info,
table_routes,
partition_manager,
datanode_clients,
backend,
}
}
// TODO(LFC): Finding regions now seems less efficient, should be further looked into.
fn find_regions(
&self,
partition_rule: PartitionRuleRef<Error>,
filters: &[Expr],
) -> Result<Vec<RegionNumber>> {
let regions = if let Some((first, rest)) = filters.split_first() {
let mut target = Self::find_regions0(partition_rule.clone(), first)?;
for filter in rest {
let regions = Self::find_regions0(partition_rule.clone(), filter)?;
// When all filters are provided as a collection, it often implicitly states that
// "all filters must be satisfied". So we join all the results here.
target.retain(|x| regions.contains(x));
// Failed fast, empty collection join any is empty.
if target.is_empty() {
break;
}
}
target.into_iter().collect::<Vec<_>>()
} else {
partition_rule.find_regions(&[])?
};
ensure!(
!regions.is_empty(),
error::FindRegionsSnafu {
filters: filters.to_vec()
}
);
Ok(regions)
}
// TODO(LFC): Support other types of filter expr:
// - BETWEEN and IN (maybe more)
// - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?)
// - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x"
fn find_regions0(
partition_rule: PartitionRuleRef<Error>,
filter: &Expr,
) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)),
(DfExpr::Literal(v), DfExpr::Column(c)) => {
Some((&c.name, reverse_operator(op), v))
}
_ => None,
};
if let Some((column, op, sv)) = column_op_value {
let value = sv
.clone()
.try_into()
.with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?;
return Ok(partition_rule
.find_regions(&[PartitionExpr::new(column, op, value)])?
.into_iter()
.collect::<HashSet<RegionNumber>>());
}
}
DfExpr::BinaryExpr(BinaryExpr { left, op, right })
if matches!(op, Operator::And | Operator::Or) =>
{
let left_regions =
Self::find_regions0(partition_rule.clone(), &(*left.clone()).into())?;
let right_regions =
Self::find_regions0(partition_rule.clone(), &(*right.clone()).into())?;
let regions = match op {
Operator::And => left_regions
.intersection(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
Operator::Or => left_regions
.union(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
_ => unreachable!(),
};
return Ok(regions);
}
_ => (),
}
// Returns all regions for not supported partition expr as a safety hatch.
Ok(partition_rule
.find_regions(&[])?
.into_iter()
.collect::<HashSet<RegionNumber>>())
}
async fn find_datanodes(
&self,
regions: Vec<RegionNumber>,
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self.table_routes.get_route(&self.table_name).await?;
let mut datanodes = HashMap::new();
for region in regions.iter() {
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == *region as u64 {
x.leader_peer.clone()
} else {
None
}
})
.context(error::FindDatanodeSnafu { region: *region })?;
datanodes
.entry(datanode)
.or_insert_with(Vec::new)
.push(*region);
}
Ok(datanodes)
}
async fn find_partition_rule(&self) -> Result<PartitionRuleRef<Error>> {
let route = self.table_routes.get_route(&self.table_name).await?;
ensure!(
!route.region_routes.is_empty(),
error::FindRegionRoutesSnafu {
table_name: self.table_name.to_string()
}
);
let mut partitions = Vec::with_capacity(route.region_routes.len());
for r in route.region_routes.iter() {
let partition =
r.region
.partition
.clone()
.context(error::FindRegionPartitionSnafu {
region: r.region.id,
table_name: self.table_name.to_string(),
})?;
let partition_def: PartitionDef = partition.try_into()?;
partitions.push((r.region.id, partition_def));
}
partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds()));
ensure!(
partitions
.windows(2)
.all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()),
error::IllegalTableRoutesDataSnafu {
table_name: self.table_name.to_string(),
err_msg: "partition columns of all regions are not the same"
}
);
let partition_columns = partitions[0].1.partition_columns();
ensure!(
!partition_columns.is_empty(),
error::IllegalTableRoutesDataSnafu {
table_name: self.table_name.to_string(),
err_msg: "no partition columns found"
}
);
let regions = partitions
.iter()
.map(|x| x.0 as u32)
.collect::<Vec<RegionNumber>>();
// TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way.
let partition_rule: PartitionRuleRef<Error> = match partition_columns.len() {
1 => {
// Omit the last "MAXVALUE".
let bounds = partitions
.iter()
.filter_map(|(_, p)| match &p.partition_bounds()[0] {
PartitionBound::Value(v) => Some(v.clone()),
PartitionBound::MaxValue => None,
})
.collect::<Vec<Value>>();
Arc::new(RangePartitionRule::new(
partition_columns[0].clone(),
bounds,
regions,
)) as _
}
_ => {
let bounds = partitions
.iter()
.map(|x| x.1.partition_bounds().clone())
.collect::<Vec<Vec<PartitionBound>>>();
Arc::new(RangeColumnsPartitionRule::new(
partition_columns.clone(),
bounds,
regions,
)) as _
}
};
Ok(partition_rule)
}
pub(crate) async fn table_global_value(
&self,
key: &TableGlobalKey,
@@ -394,9 +178,9 @@ impl DistTable {
.backend
.get(key.to_string().as_bytes())
.await
.context(CatalogSnafu)?;
.context(error::CatalogSnafu)?;
Ok(if let Some(raw) = raw {
Some(TableGlobalValue::from_bytes(raw.1).context(CatalogEntrySerdeSnafu)?)
Some(TableGlobalValue::from_bytes(raw.1).context(error::CatalogEntrySerdeSnafu)?)
} else {
None
})
@@ -407,17 +191,17 @@ impl DistTable {
key: TableGlobalKey,
value: TableGlobalValue,
) -> Result<()> {
let value = value.as_bytes().context(CatalogEntrySerdeSnafu)?;
let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?;
self.backend
.set(key.to_string().as_bytes(), &value)
.await
.context(CatalogSnafu)
.context(error::CatalogSnafu)
}
async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> {
let alter_expr = context
.get::<AlterExpr>()
.context(ContextValueNotFoundSnafu { key: "AlterExpr" })?;
.context(error::ContextValueNotFoundSnafu { key: "AlterExpr" })?;
self.alter_by_expr(alter_expr).await?;
@@ -426,9 +210,9 @@ impl DistTable {
let new_meta = table_info
.meta
.builder_with_alter_kind(table_name, &request.alter_kind)
.context(TableSnafu)?
.context(error::TableSnafu)?
.build()
.context(BuildTableMetaSnafu {
.context(error::BuildTableMetaSnafu {
table_name: table_name.clone(),
})?;
@@ -441,12 +225,12 @@ impl DistTable {
schema_name: alter_expr.schema_name.clone(),
table_name: alter_expr.table_name.clone(),
};
let mut value = self
.table_global_value(&key)
.await?
.context(TableNotFoundSnafu {
table_name: alter_expr.table_name.clone(),
})?;
let mut value =
self.table_global_value(&key)
.await?
.context(error::TableNotFoundSnafu {
table_name: alter_expr.table_name.clone(),
})?;
value.table_info = new_info.into();
@@ -456,11 +240,17 @@ impl DistTable {
/// Define a `alter_by_expr` instead of impl [`Table::alter`] to avoid redundant conversion between
/// [`table::requests::AlterTableRequest`] and [`AlterExpr`].
async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> {
let table_routes = self.table_routes.get_route(&self.table_name).await?;
let table_routes = self
.partition_manager
.find_table_route(&self.table_name)
.await
.with_context(|_| error::FindTableRouteSnafu {
table_name: self.table_name.to_string(),
})?;
let leaders = table_routes.find_leaders();
ensure!(
!leaders.is_empty(),
LeaderNotFoundSnafu {
error::LeaderNotFoundSnafu {
table: format!(
"{:?}.{:?}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
@@ -473,7 +263,10 @@ impl DistTable {
self.datanode_clients.get_client(&datanode).await,
);
debug!("Sending {:?} to {:?}", expr, db);
let result = db.alter(expr.clone()).await.context(RequestDatanodeSnafu)?;
let result = db
.alter(expr.clone())
.await
.context(error::RequestDatanodeSnafu)?;
debug!("Alter table result: {:?}", result);
// TODO(hl): We should further check and track alter result in some global DDL task tracker
}
@@ -494,28 +287,6 @@ fn project_schema(table_schema: SchemaRef, projection: Option<&Vec<usize>>) -> S
}
}
fn is_compare_op(op: &Operator) -> bool {
matches!(
*op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
)
}
fn reverse_operator(op: &Operator) -> Operator {
match *op {
Operator::Lt => Operator::Gt,
Operator::Gt => Operator::Lt,
Operator::LtEq => Operator::GtEq,
Operator::GtEq => Operator::LtEq,
_ => *op,
}
}
#[derive(Debug)]
struct DistTableScan {
schema: SchemaRef,
@@ -604,6 +375,8 @@ impl PartitionExec {
#[cfg(test)]
mod test {
use std::collections::HashMap;
use api::v1::column::SemanticType;
use api::v1::{column, Column, ColumnDataType, InsertRequest};
use catalog::error::Result;
@@ -617,7 +390,7 @@ mod test {
use datafusion::prelude::SessionContext;
use datafusion::sql::sqlparser;
use datafusion_expr::expr_fn::{and, binary_expr, col, or};
use datafusion_expr::lit;
use datafusion_expr::{lit, Operator};
use datanode::instance::Instance;
use datatypes::arrow::compute::SortOptions;
use datatypes::prelude::ConcreteDataType;
@@ -626,14 +399,20 @@ mod test {
use meta_client::client::MetaClient;
use meta_client::rpc::router::RegionRoute;
use meta_client::rpc::{Region, Table, TableRoute};
use partition::columns::RangeColumnsPartitionRule;
use partition::manager::PartitionRuleManager;
use partition::partition::{PartitionBound, PartitionDef};
use partition::range::RangePartitionRule;
use partition::route::TableRoutes;
use partition::PartitionRuleRef;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::TableRef;
use super::*;
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::partitioning::range::RangePartitionRule;
struct DummyKvBackend;
@@ -667,33 +446,8 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_find_partition_rule() {
let table_name = TableName::new("greptime", "public", "foo");
let column_schemas = vec![
ColumnSchema::new("ts", ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas.clone()));
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default())));
let table = DistTable {
table_name: table_name.clone(),
table_info: Arc::new(table_info),
table_routes: table_routes.clone(),
datanode_clients: Arc::new(DatanodeClients::new()),
backend: Arc::new(DummyKvBackend),
};
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone()));
let table_route = TableRoute {
table: Table {
@@ -759,7 +513,10 @@ mod test {
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
let partition_rule = table.find_partition_rule().await.unwrap();
let partition_rule = partition_manager
.find_table_partition_rule(&table_name)
.await
.unwrap();
let range_rule = partition_rule
.as_any()
.downcast_ref::<RangePartitionRule>()
@@ -838,7 +595,10 @@ mod test {
.insert_table_route(table_name.clone(), Arc::new(table_route))
.await;
let partition_rule = table.find_partition_rule().await.unwrap();
let partition_rule = partition_manager
.find_table_partition_rule(&table_name)
.await
.unwrap();
let range_columns_rule = partition_rule
.as_any()
.downcast_ref::<RangeColumnsPartitionRule>()
@@ -1035,7 +795,7 @@ mod test {
let datanode_instances = instance.datanodes;
let catalog_manager = dist_instance.catalog_manager();
let table_routes = catalog_manager.table_routes();
let partition_manager = catalog_manager.partition_manager();
let datanode_clients = catalog_manager.datanode_clients();
let table_name = TableName::new("greptime", "public", "dist_numbers");
@@ -1074,7 +834,10 @@ mod test {
.await
.unwrap();
let table_route = table_routes.get_route(&table_name).await.unwrap();
let table_route = partition_manager
.find_table_route(&table_name)
.await
.unwrap();
let mut region_to_datanode_mapping = HashMap::new();
for region_route in table_route.region_routes.iter() {
@@ -1114,7 +877,7 @@ mod test {
DistTable {
table_name,
table_info: Arc::new(table_info),
table_routes,
partition_manager,
datanode_clients,
backend: catalog_manager.backend(),
}
@@ -1169,30 +932,9 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_find_regions() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
true,
)]));
let table_name = TableName::new("greptime", "public", "foo");
let meta = TableMetaBuilder::default()
.schema(schema)
.primary_key_indices(vec![])
.next_column_id(1)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name.table_name)
.meta(meta)
.build()
.unwrap();
let table = DistTable {
table_name,
table_info: Arc::new(table_info),
table_routes: Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))),
datanode_clients: Arc::new(DatanodeClients::new()),
backend: Arc::new(DummyKvBackend),
};
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new(
Arc::new(MetaClient::default()),
))));
// PARTITION BY RANGE (a) (
// PARTITION r1 VALUES LESS THAN (10),
@@ -1200,18 +942,18 @@ mod test {
// PARTITION r3 VALUES LESS THAN (50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE),
// )
let partition_rule: PartitionRuleRef<Error> = Arc::new(RangePartitionRule::new(
let partition_rule: PartitionRuleRef = Arc::new(RangePartitionRule::new(
"a",
vec![10_i32.into(), 20_i32.into(), 50_i32.into()],
vec![0_u32, 1, 2, 3],
)) as _;
let partition_rule_clone = partition_rule.clone();
let test = |filters: Vec<Expr>, expect_regions: Vec<RegionNumber>| {
let mut regions = table
.find_regions(partition_rule.clone(), filters.as_slice())
let mut regions = partition_manager
.find_regions_by_filters(partition_rule_clone.clone(), filters.as_slice())
.unwrap();
regions.sort();
assert_eq!(regions, expect_regions);
};
@@ -1298,7 +1040,7 @@ mod test {
);
// test failed to find regions by contradictory filters
let regions = table.find_regions(
let regions = partition_manager.find_regions_by_filters(
partition_rule,
vec![and(
binary_expr(col("a"), Operator::Lt, lit(20)),
@@ -1309,7 +1051,7 @@ mod test {
); // a < 20 AND a >= 20
assert!(matches!(
regions.unwrap_err(),
error::Error::FindRegions { .. }
partition::error::Error::FindRegions { .. }
));
}
}

View File

@@ -27,7 +27,7 @@ use table::requests::InsertRequest;
use super::DistTable;
use crate::error;
use crate::error::Result;
use crate::error::{FindTableRouteSnafu, Result};
use crate::table::scan::DatanodeInstance;
impl DistTable {
@@ -35,8 +35,13 @@ impl DistTable {
&self,
inserts: HashMap<RegionNumber, InsertRequest>,
) -> Result<Output> {
let route = self.table_routes.get_route(&self.table_name).await?;
let route = self
.partition_manager
.find_table_route(&self.table_name)
.await
.with_context(|_| FindTableRouteSnafu {
table_name: self.table_name.to_string(),
})?;
let mut joins = Vec::with_capacity(inserts.len());
for (region_id, insert) in inserts {
let datanode = route

View File

@@ -28,6 +28,8 @@ use meta_srv::metasrv::MetaSrvOptions;
use meta_srv::mocks::MockInfo;
use meta_srv::service::store::kv::KvStoreRef;
use meta_srv::service::store::memory::MemStore;
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use servers::grpc::GrpcServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::Mode;
@@ -39,7 +41,6 @@ use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::instance::distributed::DistInstance;
use crate::instance::Instance;
use crate::table::route::TableRoutes;
/// Guard against the `TempDir`s that used in unit tests.
/// (The `TempDir` will be deleted once it goes out of scope.)
@@ -241,10 +242,12 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let partition_manager = Arc::new(PartitionRuleManager::new(Arc::new(TableRoutes::new(
meta_client.clone(),
))));
let catalog_manager = Arc::new(FrontendCatalogManager::new(
meta_backend,
table_routes.clone(),
partition_manager,
datanode_clients.clone(),
));

22
src/partition/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "partition"
version.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../datatypes" }
meta-client = { path = "../meta-client" }
moka = { version = "0.9", features = ["future"] }
snafu.workspace = true
store-api = { path = "../store-api" }
serde.workspace = true
serde_json = "1.0"
table = { path = "../table" }

View File

@@ -20,7 +20,7 @@ use snafu::ensure;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule};
use crate::partition::{PartitionBound, PartitionExpr, PartitionRule};
/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows
/// partitioning by multiple columns.
@@ -77,7 +77,7 @@ pub struct RangeColumnsPartitionRule {
impl RangeColumnsPartitionRule {
// It's assured that input arguments are valid because they are checked in SQL parsing stage.
// So we can skip validating them.
pub(crate) fn new(
pub fn new(
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionNumber>,
@@ -123,25 +123,20 @@ impl RangeColumnsPartitionRule {
}
}
#[cfg(test)]
pub(crate) fn column_list(&self) -> &Vec<String> {
pub fn column_list(&self) -> &Vec<String> {
&self.column_list
}
#[cfg(test)]
pub(crate) fn value_lists(&self) -> &Vec<Vec<PartitionBound>> {
pub fn value_lists(&self) -> &Vec<Vec<PartitionBound>> {
&self.value_lists
}
#[cfg(test)]
pub(crate) fn regions(&self) -> &Vec<RegionNumber> {
pub fn regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
}
impl PartitionRule for RangeColumnsPartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
@@ -150,7 +145,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
self.column_list.clone()
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
ensure!(
values.len() == self.column_list.len(),
error::RegionKeysSizeSnafu {
@@ -171,7 +166,7 @@ impl PartitionRule for RangeColumnsPartitionRule {
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error> {
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
@@ -220,6 +215,7 @@ mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::partition::{PartitionBound, PartitionExpr};
#[test]
fn test_find_regions() {

155
src/partition/src/error.rs Normal file
View File

@@ -0,0 +1,155 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::prelude::*;
use common_query::prelude::Expr;
use datafusion_common::ScalarValue;
use snafu::Snafu;
use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to get cache, error: {}", err_msg))]
GetCache {
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to request Meta, source: {}", source))]
RequestMeta {
#[snafu(backtrace)]
source: meta_client::error::Error,
},
#[snafu(display("Failed to find Datanode, table: {} region: {:?}", table, region))]
FindDatanode {
table: String,
region: RegionId,
backtrace: Backtrace,
},
#[snafu(display("Failed to find table routes for table {}", table_name))]
FindTableRoutes {
table_name: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to find region routes for table {}, region id: {}",
table_name,
region_id
))]
FindRegionRoutes {
table_name: String,
region_id: u64,
backtrace: Backtrace,
},
#[snafu(display("Failed to serialize value to json, source: {}", source))]
SerializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to deserialize value from json, source: {}", source))]
DeserializeJson {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Expect {} region keys, actual {}", expect, actual))]
RegionKeysSize {
expect: usize,
actual: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to find region, reason: {}", reason))]
FindRegion {
reason: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to find regions by filters: {:?}", filters))]
FindRegions {
filters: Vec<Expr>,
backtrace: Backtrace,
},
#[snafu(display("Failed to find partition column: {}", column_name))]
FindPartitionColumn {
column_name: String,
backtrace: Backtrace,
},
#[snafu(display("Invalid InsertRequest, reason: {}", reason))]
InvalidInsertRequest {
reason: String,
backtrace: Backtrace,
},
#[snafu(display(
"Invalid table route data in meta, table name: {}, msg: {}",
table_name,
err_msg
))]
InvalidTableRouteData {
table_name: String,
err_msg: String,
backtrace: Backtrace,
},
#[snafu(display(
"Failed to convert DataFusion's ScalarValue: {:?}, source: {}",
value,
source
))]
ConvertScalarValue {
value: ScalarValue,
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::GetCache { .. } => StatusCode::StorageUnavailable,
Error::FindRegionRoutes { .. } => StatusCode::InvalidArguments,
Error::FindTableRoutes { .. } => StatusCode::InvalidArguments,
Error::RequestMeta { source, .. } => source.status_code(),
Error::FindRegion { .. }
| Error::FindRegions { .. }
| Error::RegionKeysSize { .. }
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments,
Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal,
Error::InvalidTableRouteData { .. } => StatusCode::Internal,
Error::ConvertScalarValue { .. } => StatusCode::Internal,
Error::FindDatanode { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
pub type Result<T> = std::result::Result<T, Error>;

25
src/partition/src/lib.rs Normal file
View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
pub mod columns;
pub mod error;
pub mod manager;
pub mod partition;
pub mod range;
pub mod route;
pub mod splitter;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

View File

@@ -0,0 +1,282 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_query::prelude::Expr;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::prelude::Value;
use meta_client::rpc::{Peer, TableName, TableRoute};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::Result;
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::route::TableRoutes;
use crate::splitter::{InsertRequestSplit, WriteSplitter};
use crate::{error, PartitionRuleRef};
pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
/// PartitionRuleManager manages the table routes and partition rules.
/// It provides methods to find regions by:
/// - values (in case of insertion)
/// - filters (in case of select, deletion and update)
pub struct PartitionRuleManager {
table_routes: Arc<TableRoutes>,
}
impl PartitionRuleManager {
pub fn new(table_routes: Arc<TableRoutes>) -> Self {
Self { table_routes }
}
/// Find table route of given table name.
pub async fn find_table_route(&self, table: &TableName) -> Result<Arc<TableRoute>> {
self.table_routes.get_route(table).await
}
/// Find datanodes of corresponding regions of given table.
pub async fn find_region_datanodes(
&self,
table: &TableName,
regions: Vec<RegionNumber>,
) -> Result<HashMap<Peer, Vec<RegionNumber>>> {
let route = self.table_routes.get_route(table).await?;
let mut datanodes = HashMap::with_capacity(regions.len());
for region in regions.iter() {
let datanode = route
.region_routes
.iter()
.find_map(|x| {
if x.region.id == *region as u64 {
x.leader_peer.clone()
} else {
None
}
})
.context(error::FindDatanodeSnafu {
table: table.to_string(),
region: *region,
})?;
datanodes
.entry(datanode)
.or_insert_with(Vec::new)
.push(*region);
}
Ok(datanodes)
}
/// Get partition rule of given table.
pub async fn find_table_partition_rule(&self, table: &TableName) -> Result<PartitionRuleRef> {
let route = self.table_routes.get_route(table).await?;
ensure!(
!route.region_routes.is_empty(),
error::FindTableRoutesSnafu {
table_name: table.to_string()
}
);
let mut partitions = Vec::with_capacity(route.region_routes.len());
for r in route.region_routes.iter() {
let partition = r
.region
.partition
.clone()
.context(error::FindRegionRoutesSnafu {
region_id: r.region.id,
table_name: table.to_string(),
})?;
let partition_def = PartitionDef::try_from(partition)?;
partitions.push((r.region.id, partition_def));
}
partitions.sort_by(|a, b| a.1.partition_bounds().cmp(b.1.partition_bounds()));
ensure!(
partitions
.windows(2)
.all(|w| w[0].1.partition_columns() == w[1].1.partition_columns()),
error::InvalidTableRouteDataSnafu {
table_name: table.to_string(),
err_msg: "partition columns of all regions are not the same"
}
);
let partition_columns = partitions[0].1.partition_columns();
ensure!(
!partition_columns.is_empty(),
error::InvalidTableRouteDataSnafu {
table_name: table.to_string(),
err_msg: "no partition columns found"
}
);
let regions = partitions
.iter()
.map(|x| x.0 as u32)
.collect::<Vec<RegionNumber>>();
// TODO(LFC): Serializing and deserializing partition rule is ugly, must find a much more elegant way.
let partition_rule: PartitionRuleRef = match partition_columns.len() {
1 => {
// Omit the last "MAXVALUE".
let bounds = partitions
.iter()
.filter_map(|(_, p)| match &p.partition_bounds()[0] {
PartitionBound::Value(v) => Some(v.clone()),
PartitionBound::MaxValue => None,
})
.collect::<Vec<Value>>();
Arc::new(RangePartitionRule::new(
partition_columns[0].clone(),
bounds,
regions,
)) as _
}
_ => {
let bounds = partitions
.iter()
.map(|x| x.1.partition_bounds().clone())
.collect::<Vec<Vec<PartitionBound>>>();
Arc::new(RangeColumnsPartitionRule::new(
partition_columns.clone(),
bounds,
regions,
)) as _
}
};
Ok(partition_rule)
}
/// Find regions in partition rule by filters.
pub fn find_regions_by_filters(
&self,
partition_rule: PartitionRuleRef,
filters: &[Expr],
) -> Result<Vec<RegionNumber>> {
let regions = if let Some((first, rest)) = filters.split_first() {
let mut target = find_regions0(partition_rule.clone(), first)?;
for filter in rest {
let regions = find_regions0(partition_rule.clone(), filter)?;
// When all filters are provided as a collection, it often implicitly states that
// "all filters must be satisfied". So we join all the results here.
target.retain(|x| regions.contains(x));
// Failed fast, empty collection join any is empty.
if target.is_empty() {
break;
}
}
target.into_iter().collect::<Vec<_>>()
} else {
partition_rule.find_regions(&[])?
};
ensure!(
!regions.is_empty(),
error::FindRegionsSnafu {
filters: filters.to_vec()
}
);
Ok(regions)
}
/// Split [InsertRequest] into [InsertRequestSplit] according to the partition rule
/// of given table.
pub async fn split_insert_request(
&self,
table: &TableName,
req: InsertRequest,
) -> Result<InsertRequestSplit> {
let partition_rule = self.find_table_partition_rule(table).await.unwrap();
let splitter = WriteSplitter::with_partition_rule(partition_rule);
splitter.split_insert(req)
}
}
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)),
(DfExpr::Literal(v), DfExpr::Column(c)) => Some((&c.name, reverse_operator(op), v)),
_ => None,
};
if let Some((column, op, scalar)) = column_op_value {
let value = Value::try_from(scalar.clone()).with_context(|_| {
error::ConvertScalarValueSnafu {
value: scalar.clone(),
}
})?;
return Ok(partition_rule
.find_regions(&[PartitionExpr::new(column, op, value)])?
.into_iter()
.collect::<HashSet<RegionNumber>>());
}
}
DfExpr::BinaryExpr(BinaryExpr { left, op, right })
if matches!(op, Operator::And | Operator::Or) =>
{
let left_regions = find_regions0(partition_rule.clone(), &(*left.clone()).into())?;
let right_regions = find_regions0(partition_rule.clone(), &(*right.clone()).into())?;
let regions = match op {
Operator::And => left_regions
.intersection(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
Operator::Or => left_regions
.union(&right_regions)
.cloned()
.collect::<HashSet<RegionNumber>>(),
_ => unreachable!(),
};
return Ok(regions);
}
_ => (),
}
// Returns all regions for not supported partition expr as a safety hatch.
Ok(partition_rule
.find_regions(&[])?
.into_iter()
.collect::<HashSet<RegionNumber>>())
}
#[inline]
fn is_compare_op(op: &Operator) -> bool {
matches!(
*op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
)
}
#[inline]
fn reverse_operator(op: &Operator) -> Operator {
match *op {
Operator::Lt => Operator::Gt,
Operator::Gt => Operator::Lt,
Operator::LtEq => Operator::GtEq,
Operator::GtEq => Operator::LtEq,
_ => *op,
}
}

View File

@@ -12,14 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod columns;
pub(crate) mod range;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
pub use datafusion_expr::Operator;
use datafusion_expr::Operator;
use datatypes::prelude::Value;
use meta_client::rpc::Partition as MetaPartition;
use serde::{Deserialize, Serialize};
@@ -28,51 +25,46 @@ use store_api::storage::RegionNumber;
use crate::error::{self, Error};
pub(crate) type PartitionRuleRef<E> = Arc<dyn PartitionRule<Error = E>>;
pub type PartitionRuleRef = Arc<dyn PartitionRule>;
pub trait PartitionRule: Sync + Send {
type Error: Debug;
fn as_any(&self) -> &dyn Any;
fn partition_columns(&self) -> Vec<String>;
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
// Or find better names since one is mainly for writes and the other is for reads.
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error>;
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error>;
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error>;
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error>;
}
/// The right bound(exclusive) of partition range.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub(crate) enum PartitionBound {
pub enum PartitionBound {
Value(Value),
MaxValue,
}
#[derive(Debug)]
pub(crate) struct PartitionDef {
pub struct PartitionDef {
partition_columns: Vec<String>,
partition_bounds: Vec<PartitionBound>,
}
impl PartitionDef {
pub(crate) fn new(
partition_columns: Vec<String>,
partition_bounds: Vec<PartitionBound>,
) -> Self {
pub fn new(partition_columns: Vec<String>, partition_bounds: Vec<PartitionBound>) -> Self {
Self {
partition_columns,
partition_bounds,
}
}
pub(crate) fn partition_columns(&self) -> &Vec<String> {
pub fn partition_columns(&self) -> &Vec<String> {
&self.partition_columns
}
pub(crate) fn partition_bounds(&self) -> &Vec<PartitionBound> {
pub fn partition_bounds(&self) -> &Vec<PartitionBound> {
&self.partition_bounds
}
}
@@ -133,13 +125,13 @@ impl TryFrom<PartitionDef> for MetaPartition {
#[derive(Debug, PartialEq, Eq)]
pub struct PartitionExpr {
column: String,
op: Operator,
value: Value,
pub column: String,
pub op: Operator,
pub value: Value,
}
impl PartitionExpr {
pub(crate) fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
pub fn new(column: impl Into<String>, op: Operator, value: Value) -> Self {
Self {
column: column.into(),
op,

View File

@@ -14,13 +14,14 @@
use std::any::Any;
use datafusion_expr::Operator;
use datatypes::prelude::*;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use store_api::storage::RegionNumber;
use crate::error::{self, Error};
use crate::partitioning::{Operator, PartitionExpr, PartitionRule};
use crate::partition::{PartitionExpr, PartitionRule};
/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value
/// range. It's generated from create table request, using MySQL's syntax:
@@ -70,7 +71,7 @@ pub struct RangePartitionRule {
}
impl RangePartitionRule {
pub(crate) fn new(
pub fn new(
column_name: impl Into<String>,
bounds: Vec<Value>,
regions: Vec<RegionNumber>,
@@ -82,23 +83,20 @@ impl RangePartitionRule {
}
}
pub(crate) fn column_name(&self) -> &String {
pub fn column_name(&self) -> &String {
&self.column_name
}
pub(crate) fn all_regions(&self) -> &Vec<RegionNumber> {
pub fn all_regions(&self) -> &Vec<RegionNumber> {
&self.regions
}
#[cfg(test)]
pub(crate) fn bounds(&self) -> &Vec<Value> {
pub fn bounds(&self) -> &Vec<Value> {
&self.bounds
}
}
impl PartitionRule for RangePartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
@@ -107,7 +105,7 @@ impl PartitionRule for RangePartitionRule {
vec![self.column_name().to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
debug_assert_eq!(
values.len(),
1,
@@ -122,7 +120,7 @@ impl PartitionRule for RangePartitionRule {
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Self::Error> {
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionNumber>, Error> {
if exprs.is_empty() {
return Ok(self.regions.clone());
}
@@ -173,7 +171,10 @@ impl PartitionRule for RangePartitionRule {
#[cfg(test)]
mod test {
use datafusion_expr::Operator;
use super::*;
use crate::partition::PartitionExpr;
#[test]
fn test_find_regions() {

View File

@@ -22,13 +22,14 @@ use snafu::{ensure, ResultExt};
use crate::error::{self, Result};
pub(crate) struct TableRoutes {
pub struct TableRoutes {
meta_client: Arc<MetaClient>,
cache: Cache<TableName, Arc<TableRoute>>,
}
// TODO(hl): maybe periodically refresh table route cache?
impl TableRoutes {
pub(crate) fn new(meta_client: Arc<MetaClient>) -> Self {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
Self {
meta_client,
cache: CacheBuilder::new(1024)
@@ -38,7 +39,7 @@ impl TableRoutes {
}
}
pub(crate) async fn get_route(&self, table_name: &TableName) -> Result<Arc<TableRoute>> {
pub async fn get_route(&self, table_name: &TableName) -> Result<Arc<TableRoute>> {
self.cache
.try_get_with_by_ref(table_name, self.get_from_meta(table_name))
.await
@@ -68,12 +69,7 @@ impl TableRoutes {
Ok(Arc::new(route))
}
#[cfg(test)]
pub(crate) async fn insert_table_route(
&self,
table_name: TableName,
table_route: Arc<TableRoute>,
) {
pub async fn insert_table_route(&self, table_name: TableName, table_route: Arc<TableRoute>) {
self.cache.insert(table_name, table_route).await
}
}

View File

@@ -20,34 +20,33 @@ use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use table::requests::InsertRequest;
use table::requests::{DeleteRequest, InsertRequest};
use crate::error::{
Error, FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result,
};
use crate::partitioning::PartitionRuleRef;
use crate::error::{FindPartitionColumnSnafu, FindRegionSnafu, InvalidInsertRequestSnafu, Result};
use crate::PartitionRuleRef;
pub type DistInsertRequest = HashMap<RegionNumber, InsertRequest>;
pub type InsertRequestSplit = HashMap<RegionNumber, InsertRequest>;
pub type DeleteRequestSplit = HashMap<RegionNumber, DeleteRequest>;
pub struct WriteSpliter {
partition_rule: PartitionRuleRef<Error>,
pub struct WriteSplitter {
partition_rule: PartitionRuleRef,
}
impl WriteSpliter {
pub fn with_partition_rule(rule: PartitionRuleRef<Error>) -> Self {
impl WriteSplitter {
pub fn with_partition_rule(rule: PartitionRuleRef) -> Self {
Self {
partition_rule: rule,
}
}
pub fn split(&self, insert: InsertRequest) -> Result<DistInsertRequest> {
pub fn split_insert(&self, insert: InsertRequest) -> Result<InsertRequestSplit> {
check_req(&insert)?;
let column_names = self.partition_rule.partition_columns();
let partition_columns = find_partitioning_values(&insert, &column_names)?;
let region_map = self.split_partitioning_values(&partition_columns)?;
Ok(partition_insert_request(&insert, region_map))
Ok(split_insert_request(&insert, region_map))
}
fn split_partitioning_values(
@@ -80,10 +79,6 @@ impl WriteSpliter {
}
fn check_req(insert: &InsertRequest) -> Result<()> {
check_vectors_len(insert)
}
fn check_vectors_len(insert: &InsertRequest) -> Result<()> {
let mut len: Option<usize> = None;
for vector in insert.columns_values.values() {
match len {
@@ -123,10 +118,10 @@ fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec<Value> {
.collect()
}
fn partition_insert_request(
fn split_insert_request(
insert: &InsertRequest,
region_map: HashMap<RegionNumber, Vec<usize>>,
) -> DistInsertRequest {
) -> InsertRequestSplit {
let mut dist_insert: HashMap<RegionNumber, HashMap<&str, Box<dyn MutableVector>>> =
HashMap::with_capacity(region_map.len());
@@ -185,7 +180,6 @@ mod tests {
use std::result::Result;
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::ScalarVectorBuilder;
use datatypes::types::StringType;
@@ -198,11 +192,11 @@ mod tests {
use table::requests::InsertRequest;
use super::{
check_req, find_partitioning_values, partition_insert_request, partition_values,
WriteSpliter,
check_req, find_partitioning_values, partition_values, split_insert_request, WriteSplitter,
};
use crate::error::Error;
use crate::partitioning::{PartitionExpr, PartitionRule, PartitionRuleRef};
use crate::partition::{PartitionExpr, PartitionRule};
use crate::PartitionRuleRef;
#[test]
fn test_insert_req_check() {
@@ -218,9 +212,9 @@ mod tests {
#[test]
fn test_writer_spliter() {
let insert = mock_insert_request();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef<Error>;
let spliter = WriteSpliter::with_partition_rule(rule);
let ret = spliter.split(insert).unwrap();
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
let spliter = WriteSplitter::with_partition_rule(rule);
let ret = spliter.split_insert(insert).unwrap();
assert_eq!(2, ret.len());
@@ -277,7 +271,7 @@ mod tests {
region_map.insert(1, vec![2, 0]);
region_map.insert(2, vec![1]);
let dist_insert = partition_insert_request(&insert, region_map);
let dist_insert = split_insert_request(&insert, region_map);
let r1_insert = dist_insert.get(&1_u32).unwrap();
assert_eq!("demo", r1_insert.table_name);
@@ -402,8 +396,8 @@ mod tests {
columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns_values,
}
@@ -429,8 +423,8 @@ mod tests {
columns_values.insert("id".to_string(), builder.to_vector());
InsertRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns_values,
}
@@ -444,8 +438,6 @@ mod tests {
// PARTITION r1 VALUES IN(2, 3),
// );
impl PartitionRule for MockPartitionRule {
type Error = Error;
fn as_any(&self) -> &dyn Any {
self
}
@@ -454,7 +446,7 @@ mod tests {
vec!["id".to_string()]
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Self::Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionNumber, Error> {
let val = values.get(0).unwrap().to_owned();
let id_1: Value = 1_i16.into();
let id_2: Value = 2_i16.into();