feat/column-partition:

### Refactor and Enhance Partition Handling

 - **Refactor Partition Parsing Logic**: Moved partition parsing logic from `src/operator/src/statement/ddl.rs` to a new utility module `src/partition/src/utils.rs`. This includes functions like `parse_partitions`, `find_partition_bounds`, and `convert_one_expr`.
 - **Error Handling Improvements**: Added new error variants `ColumnNotFound`, `InvalidPartitionRule`, and `ParseSqlValue` in `src/partition/src/error.rs` to improve error reporting for partition-related operations.
 - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to include new dependencies `common-time` and `session`.
 - **Code Cleanup**: Removed redundant partition parsing functions from `src/operator/src/error.rs` and `src/operator/src/statement/ddl.rs`.
This commit is contained in:
Lei, HUANG
2025-03-28 08:00:02 +00:00
parent 404df92a60
commit 183fa19f23
7 changed files with 274 additions and 200 deletions

2
Cargo.lock generated
View File

@@ -8053,6 +8053,7 @@ dependencies = [
"common-macro",
"common-meta",
"common-query",
"common-time",
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
@@ -8060,6 +8061,7 @@ dependencies = [
"itertools 0.14.0",
"serde",
"serde_json",
"session",
"snafu 0.8.5",
"sql",
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",

View File

@@ -420,13 +420,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to deserialize partition in meta to partition def"))]
DeserializePartition {
#[snafu(implicit)]
location: Location,
source: partition::error::Error,
},
#[snafu(display("Failed to describe schema for given statement"))]
DescribeStatement {
#[snafu(implicit)]
@@ -643,7 +636,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse sql value"))]
ParseSqlValue {
source: sql::error::Error,
@@ -725,12 +718,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid partition rule: {}", reason))]
InvalidPartitionRule {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid configuration value."))]
InvalidConfigValue {
@@ -923,7 +911,6 @@ impl ErrorExt for Error {
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::External { source, .. } => source.status_code(),
Error::DeserializePartition { source, .. }
| Error::FindTablePartitionRule { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. }
@@ -947,7 +934,6 @@ impl ErrorExt for Error {
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::EmptyDdlExpr { .. }
| Error::InvalidPartitionRule { .. }
| Error::ParseSqlValue { .. }
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,

View File

@@ -45,6 +45,7 @@ use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::multi_dim::MultiDimPartitionRule;
use partition::partition::{PartitionBound, PartitionDef};
use partition::utils::parse_partitions;
use query::parser::QueryStatement;
use query::plan::extract_and_rewrite_full_table_names;
use query::query_engine::DefaultSerializer;
@@ -71,8 +72,8 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
@@ -248,7 +249,8 @@ impl StatementExecutor {
&create_table.table_name,
);
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)
.context(error::InvalidPartitionSnafu)?;
let mut table_info = create_table_info(create_table, partition_cols)?;
let resp = self
@@ -1318,40 +1320,6 @@ impl StatementExecutor {
}
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?;
let partition_entries =
find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
// Validates partition
let mut exprs = vec![];
for partition in &partition_entries {
for bound in partition {
if let PartitionBound::Expr(expr) = bound {
exprs.push(expr.clone());
}
}
}
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
.context(InvalidPartitionSnafu)?;
Ok((
partition_entries
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()
.context(DeserializePartitionSnafu)?,
partition_columns,
))
}
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
@@ -1438,154 +1406,7 @@ fn create_table_info(
Ok(table_info)
}
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<_>>()
} else {
vec![]
};
Ok(columns)
}
/// Parse [Partitions] into a group of partition entries.
///
/// Returns a list of [PartitionBound], each of which defines a partition.
fn find_partition_entries(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = partitions {
// extract concrete data type of partition columns
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.column_defs
.iter()
.find(|c| &c.name == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<_>>();
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
.context(ColumnDataTypeSnafu)?,
);
column_name_and_type.insert(column_name, data_type);
}
// Transform parser expr to partition expr
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
for partition in &partitions.exprs {
let partition_expr =
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
}
// fallback for no expr
if partition_exprs.is_empty() {
partition_exprs.push(vec![PartitionBound::MaxValue]);
}
partition_exprs
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn convert_one_expr(
expr: &Expr,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
timezone: &Timezone,
) -> Result<PartitionExpr> {
let Expr::BinaryOp { left, op, right } = expr else {
return InvalidPartitionRuleSnafu {
reason: "partition rule must be a binary expression",
}
.fail();
};
let op =
RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
reason: format!("unsupported operator in partition expr {op}"),
})?;
// convert leaf node.
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
// col, val
(Expr::Identifier(ident), Expr::Value(value)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone, None)?;
(Operand::Column(column_name), op, Operand::Value(value))
}
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
if let Expr::Value(v) = &**expr =>
{
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
(Operand::Column(column_name), op, Operand::Value(value))
}
// val, col
(Expr::Value(value), Expr::Identifier(ident)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone, None)?;
(Operand::Value(value), op, Operand::Column(column_name))
}
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
if let Expr::Value(v) = &**expr =>
{
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
(Operand::Value(value), op, Operand::Column(column_name))
}
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
// sub-expr must against another sub-expr
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
(Operand::Expr(lhs), op, Operand::Expr(rhs))
}
_ => {
return InvalidPartitionRuleSnafu {
reason: format!("invalid partition expr {expr}"),
}
.fail();
}
};
Ok(PartitionExpr::new(lhs, op, rhs))
}
fn convert_identifier(
ident: &Ident,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
) -> Result<(String, ConcreteDataType)> {
let column_name = ident.value.clone();
let data_type = column_name_and_type
.get(&column_name)
.cloned()
.with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
Ok((column_name, data_type))
}
fn convert_value(
value: &ParserValue,
data_type: ConcreteDataType,
timezone: &Timezone,
unary_op: Option<UnaryOperator>,
) -> Result<Value> {
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
.context(ParseSqlValueSnafu)
}
#[cfg(test)]
mod test {

View File

@@ -14,6 +14,8 @@ common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-time.workspace = true
session.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true

View File

@@ -180,6 +180,26 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column not found: {}", column))]
ColumnNotFound {
column: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid partition rule: {}", reason))]
InvalidPartitionRule {
reason: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse sql value"))]
ParseSqlValue {
source: sql::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -204,6 +224,9 @@ impl ErrorExt for Error {
Error::TableRouteManager { source, .. } => source.status_code(),
Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
Error::ConvertToVector { source, .. } => source.status_code(),
Error::ColumnNotFound { .. } => StatusCode::InvalidArguments,
Error::InvalidPartitionRule { .. } => StatusCode::InvalidArguments,
Error::ParseSqlValue { source, .. } => source.status_code(),
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(if_let_guard)]
//! Structs and traits for partitioning rule.
@@ -22,5 +23,6 @@ pub mod manager;
pub mod multi_dim;
pub mod partition;
pub mod splitter;
pub mod utils;
pub use crate::partition::{PartitionRule, PartitionRuleRef};

238
src/partition/src/utils.rs Normal file
View File

@@ -0,0 +1,238 @@
use api::helper::ColumnDataTypeWrapper;
use api::v1::{column_def, CreateTableExpr};
use common_meta::rpc::router::Partition as MetaPartition;
use common_time::Timezone;
use datafusion_common::HashMap;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use sql::error::ParseSqlValueSnafu;
use sql::statements::create::Partitions;
use sql::statements::sql_value_to_value;
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
use crate::error::{self, Result};
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
use crate::multi_dim::MultiDimPartitionRule;
use crate::partition::{PartitionBound, PartitionDef};
/// Parse [Partitions] into a group of partition entries.
///
/// Returns a list of [PartitionBound], each of which defines a partition.
fn find_partition_bounds(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = partitions {
// extract concrete data type of partition columns
let column_defs = partition_columns
.iter()
.map(|pc| {
create_table
.column_defs
.iter()
.find(|c| &c.name == pc)
// unwrap is safe here because we have checked that partition columns are defined
.unwrap()
})
.collect::<Vec<_>>();
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
.unwrap(),
);
column_name_and_type.insert(column_name, data_type);
}
// Transform parser expr to partition expr
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
for partition in &partitions.exprs {
let partition_expr =
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
}
// fallback for no expr
if partition_exprs.is_empty() {
partition_exprs.push(vec![PartitionBound::MaxValue]);
}
partition_exprs
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
let columns = if let Some(partitions) = partitions {
partitions
.column_list
.iter()
.map(|x| x.value.clone())
.collect::<Vec<_>>()
} else {
vec![]
};
Ok(columns)
}
fn parse_partition_columns_and_exprs(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
// the partition column, and create only one partition.
let partition_columns = find_partition_columns(&partitions)?;
let partition_bounds =
find_partition_bounds(create_table, &partitions, &partition_columns, query_ctx)?;
// Validates partition
let mut exprs = vec![];
for partition in &partition_bounds {
for bound in partition {
if let PartitionBound::Expr(expr) = bound {
exprs.push(expr.clone());
}
}
}
Ok((partition_columns, partition_bounds, exprs))
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
pub fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
query_ctx: &QueryContextRef,
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
let (partition_columns, partition_bounds, exprs) =
parse_partition_columns_and_exprs(create_table, partitions, query_ctx)?;
// Check if the partition rule is valid
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)?;
Ok((
partition_bounds
.into_iter()
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
.collect::<std::result::Result<_, _>>()?,
partition_columns,
))
}
// #[cfg(test)]
// pub(crate) fn parsr_sql_to_partition_rule(
// sql: &str,
// ) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
// use session::context::QueryContextBuilder;
// use sql::dialect::GreptimeDbDialect;
// use sql::parser::{ParseOptions, ParserContext};
// use sql::statements::statement::Statement;
// let ctx = QueryContextBuilder::default().build().into();
// let result =
// ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
// .unwrap();
// match &result[0] {
// Statement::CreateTable(c) => {
// let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
// let (partitions, _) = parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
// }
// _ => unreachable!(),
// }
// parse_partition_columns_and_exprs(create_table, partitions, query_ctx)
// }
fn convert_one_expr(
expr: &Expr,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
timezone: &Timezone,
) -> Result<PartitionExpr> {
let Expr::BinaryOp { left, op, right } = expr else {
return error::InvalidPartitionRuleSnafu {
reason: "partition rule must be a binary expression",
}
.fail();
};
let op = RestrictedOp::try_from_parser(&op.clone()).with_context(|| {
error::InvalidPartitionRuleSnafu {
reason: format!("unsupported operator in partition expr {op}"),
}
})?;
// convert leaf node.
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
// col, val
(Expr::Identifier(ident), Expr::Value(value)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone, None)?;
(Operand::Column(column_name), op, Operand::Value(value))
}
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
if let Expr::Value(v) = &**expr =>
{
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
(Operand::Column(column_name), op, Operand::Value(value))
}
// val, col
(Expr::Value(value), Expr::Identifier(ident)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone, None)?;
(Operand::Value(value), op, Operand::Column(column_name))
}
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
if let Expr::Value(v) = &**expr =>
{
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
(Operand::Value(value), op, Operand::Column(column_name))
}
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
// sub-expr must against another sub-expr
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
(Operand::Expr(lhs), op, Operand::Expr(rhs))
}
_ => {
return error::InvalidPartitionRuleSnafu {
reason: format!("invalid partition expr {expr}"),
}
.fail();
}
};
Ok(PartitionExpr::new(lhs, op, rhs))
}
fn convert_identifier(
ident: &Ident,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
) -> Result<(String, ConcreteDataType)> {
let column_name = ident.value.clone();
let data_type = column_name_and_type
.get(&column_name)
.cloned()
.with_context(|| error::ColumnNotFoundSnafu {
column: column_name.clone(),
})?;
Ok((column_name, data_type))
}
fn convert_value(
value: &ParserValue,
data_type: ConcreteDataType,
timezone: &Timezone,
unary_op: Option<UnaryOperator>,
) -> Result<Value> {
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
.context(error::ParseSqlValueSnafu)
}