mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
feat/column-partition:
### Refactor and Enhance Partition Handling - **Refactor Partition Parsing Logic**: Moved partition parsing logic from `src/operator/src/statement/ddl.rs` to a new utility module `src/partition/src/utils.rs`. This includes functions like `parse_partitions`, `find_partition_bounds`, and `convert_one_expr`. - **Error Handling Improvements**: Added new error variants `ColumnNotFound`, `InvalidPartitionRule`, and `ParseSqlValue` in `src/partition/src/error.rs` to improve error reporting for partition-related operations. - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to include new dependencies `common-time` and `session`. - **Code Cleanup**: Removed redundant partition parsing functions from `src/operator/src/error.rs` and `src/operator/src/statement/ddl.rs`.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -8053,6 +8053,7 @@ dependencies = [
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-time",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datafusion-physical-expr",
|
||||
@@ -8060,6 +8061,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"sql",
|
||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||
|
||||
@@ -420,13 +420,6 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to deserialize partition in meta to partition def"))]
|
||||
DeserializePartition {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: partition::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to describe schema for given statement"))]
|
||||
DescribeStatement {
|
||||
#[snafu(implicit)]
|
||||
@@ -643,7 +636,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
|
||||
#[snafu(display("Failed to parse sql value"))]
|
||||
ParseSqlValue {
|
||||
source: sql::error::Error,
|
||||
@@ -725,12 +718,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid partition rule: {}", reason))]
|
||||
InvalidPartitionRule {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
|
||||
#[snafu(display("Invalid configuration value."))]
|
||||
InvalidConfigValue {
|
||||
@@ -923,7 +911,6 @@ impl ErrorExt for Error {
|
||||
Error::AlterExprToRequest { source, .. } => source.status_code(),
|
||||
|
||||
Error::External { source, .. } => source.status_code(),
|
||||
Error::DeserializePartition { source, .. }
|
||||
| Error::FindTablePartitionRule { source, .. }
|
||||
| Error::SplitInsert { source, .. }
|
||||
| Error::SplitDelete { source, .. }
|
||||
@@ -947,7 +934,6 @@ impl ErrorExt for Error {
|
||||
Error::ColumnDefaultValue { source, .. } => source.status_code(),
|
||||
|
||||
Error::EmptyDdlExpr { .. }
|
||||
| Error::InvalidPartitionRule { .. }
|
||||
| Error::ParseSqlValue { .. }
|
||||
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@ use lazy_static::lazy_static;
|
||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||
use partition::multi_dim::MultiDimPartitionRule;
|
||||
use partition::partition::{PartitionBound, PartitionDef};
|
||||
use partition::utils::parse_partitions;
|
||||
use query::parser::QueryStatement;
|
||||
use query::plan::extract_and_rewrite_full_table_names;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
@@ -71,8 +72,8 @@ use table::TableRef;
|
||||
use super::StatementExecutor;
|
||||
use crate::error::{
|
||||
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
|
||||
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
|
||||
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
|
||||
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
|
||||
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
|
||||
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
|
||||
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
||||
@@ -248,7 +249,8 @@ impl StatementExecutor {
|
||||
&create_table.table_name,
|
||||
);
|
||||
|
||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)
|
||||
.context(error::InvalidPartitionSnafu)?;
|
||||
let mut table_info = create_table_info(create_table, partition_cols)?;
|
||||
|
||||
let resp = self
|
||||
@@ -1318,40 +1320,6 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||
fn parse_partitions(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||
// the partition column, and create only one partition.
|
||||
let partition_columns = find_partition_columns(&partitions)?;
|
||||
let partition_entries =
|
||||
find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
|
||||
|
||||
// Validates partition
|
||||
let mut exprs = vec![];
|
||||
for partition in &partition_entries {
|
||||
for bound in partition {
|
||||
if let PartitionBound::Expr(expr) = bound {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
|
||||
.context(InvalidPartitionSnafu)?;
|
||||
|
||||
Ok((
|
||||
partition_entries
|
||||
.into_iter()
|
||||
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
||||
.collect::<std::result::Result<_, _>>()
|
||||
.context(DeserializePartitionSnafu)?,
|
||||
partition_columns,
|
||||
))
|
||||
}
|
||||
|
||||
fn create_table_info(
|
||||
create_table: &CreateTableExpr,
|
||||
partition_columns: Vec<String>,
|
||||
@@ -1438,154 +1406,7 @@ fn create_table_info(
|
||||
Ok(table_info)
|
||||
}
|
||||
|
||||
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
|
||||
let columns = if let Some(partitions) = partitions {
|
||||
partitions
|
||||
.column_list
|
||||
.iter()
|
||||
.map(|x| x.value.clone())
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
/// Parse [Partitions] into a group of partition entries.
|
||||
///
|
||||
/// Returns a list of [PartitionBound], each of which defines a partition.
|
||||
fn find_partition_entries(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: &Option<Partitions>,
|
||||
partition_columns: &[String],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
||||
let entries = if let Some(partitions) = partitions {
|
||||
// extract concrete data type of partition columns
|
||||
let column_defs = partition_columns
|
||||
.iter()
|
||||
.map(|pc| {
|
||||
create_table
|
||||
.column_defs
|
||||
.iter()
|
||||
.find(|c| &c.name == pc)
|
||||
// unwrap is safe here because we have checked that partition columns are defined
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
|
||||
for column in column_defs {
|
||||
let column_name = &column.name;
|
||||
let data_type = ConcreteDataType::from(
|
||||
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
|
||||
.context(ColumnDataTypeSnafu)?,
|
||||
);
|
||||
column_name_and_type.insert(column_name, data_type);
|
||||
}
|
||||
|
||||
// Transform parser expr to partition expr
|
||||
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
|
||||
for partition in &partitions.exprs {
|
||||
let partition_expr =
|
||||
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
||||
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
||||
}
|
||||
|
||||
// fallback for no expr
|
||||
if partition_exprs.is_empty() {
|
||||
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
||||
}
|
||||
|
||||
partition_exprs
|
||||
} else {
|
||||
vec![vec![PartitionBound::MaxValue]]
|
||||
};
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
fn convert_one_expr(
|
||||
expr: &Expr,
|
||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||
timezone: &Timezone,
|
||||
) -> Result<PartitionExpr> {
|
||||
let Expr::BinaryOp { left, op, right } = expr else {
|
||||
return InvalidPartitionRuleSnafu {
|
||||
reason: "partition rule must be a binary expression",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let op =
|
||||
RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
|
||||
reason: format!("unsupported operator in partition expr {op}"),
|
||||
})?;
|
||||
|
||||
// convert leaf node.
|
||||
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
|
||||
// col, val
|
||||
(Expr::Identifier(ident), Expr::Value(value)) => {
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(value, data_type, timezone, None)?;
|
||||
(Operand::Column(column_name), op, Operand::Value(value))
|
||||
}
|
||||
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
|
||||
if let Expr::Value(v) = &**expr =>
|
||||
{
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||
(Operand::Column(column_name), op, Operand::Value(value))
|
||||
}
|
||||
// val, col
|
||||
(Expr::Value(value), Expr::Identifier(ident)) => {
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(value, data_type, timezone, None)?;
|
||||
(Operand::Value(value), op, Operand::Column(column_name))
|
||||
}
|
||||
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
|
||||
if let Expr::Value(v) = &**expr =>
|
||||
{
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||
(Operand::Value(value), op, Operand::Column(column_name))
|
||||
}
|
||||
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
|
||||
// sub-expr must against another sub-expr
|
||||
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
|
||||
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
|
||||
(Operand::Expr(lhs), op, Operand::Expr(rhs))
|
||||
}
|
||||
_ => {
|
||||
return InvalidPartitionRuleSnafu {
|
||||
reason: format!("invalid partition expr {expr}"),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(PartitionExpr::new(lhs, op, rhs))
|
||||
}
|
||||
|
||||
fn convert_identifier(
|
||||
ident: &Ident,
|
||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||
) -> Result<(String, ConcreteDataType)> {
|
||||
let column_name = ident.value.clone();
|
||||
let data_type = column_name_and_type
|
||||
.get(&column_name)
|
||||
.cloned()
|
||||
.with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
|
||||
Ok((column_name, data_type))
|
||||
}
|
||||
|
||||
fn convert_value(
|
||||
value: &ParserValue,
|
||||
data_type: ConcreteDataType,
|
||||
timezone: &Timezone,
|
||||
unary_op: Option<UnaryOperator>,
|
||||
) -> Result<Value> {
|
||||
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
|
||||
.context(ParseSqlValueSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
@@ -14,6 +14,8 @@ common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-time.workspace = true
|
||||
session.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datafusion-physical-expr.workspace = true
|
||||
|
||||
@@ -180,6 +180,26 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Column not found: {}", column))]
|
||||
ColumnNotFound {
|
||||
column: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid partition rule: {}", reason))]
|
||||
InvalidPartitionRule {
|
||||
reason: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Failed to parse sql value"))]
|
||||
ParseSqlValue {
|
||||
source: sql::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -204,6 +224,9 @@ impl ErrorExt for Error {
|
||||
Error::TableRouteManager { source, .. } => source.status_code(),
|
||||
Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
|
||||
Error::ConvertToVector { source, .. } => source.status_code(),
|
||||
Error::ColumnNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidPartitionRule { .. } => StatusCode::InvalidArguments,
|
||||
Error::ParseSqlValue { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
#![feature(if_let_guard)]
|
||||
|
||||
//! Structs and traits for partitioning rule.
|
||||
|
||||
@@ -22,5 +23,6 @@ pub mod manager;
|
||||
pub mod multi_dim;
|
||||
pub mod partition;
|
||||
pub mod splitter;
|
||||
pub mod utils;
|
||||
|
||||
pub use crate::partition::{PartitionRule, PartitionRuleRef};
|
||||
|
||||
238
src/partition/src/utils.rs
Normal file
238
src/partition/src/utils.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::{column_def, CreateTableExpr};
|
||||
use common_meta::rpc::router::Partition as MetaPartition;
|
||||
use common_time::Timezone;
|
||||
use datafusion_common::HashMap;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::error::ParseSqlValueSnafu;
|
||||
use sql::statements::create::Partitions;
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||
use crate::multi_dim::MultiDimPartitionRule;
|
||||
use crate::partition::{PartitionBound, PartitionDef};
|
||||
|
||||
/// Parse [Partitions] into a group of partition entries.
|
||||
///
|
||||
/// Returns a list of [PartitionBound], each of which defines a partition.
|
||||
fn find_partition_bounds(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: &Option<Partitions>,
|
||||
partition_columns: &[String],
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
||||
let entries = if let Some(partitions) = partitions {
|
||||
// extract concrete data type of partition columns
|
||||
let column_defs = partition_columns
|
||||
.iter()
|
||||
.map(|pc| {
|
||||
create_table
|
||||
.column_defs
|
||||
.iter()
|
||||
.find(|c| &c.name == pc)
|
||||
// unwrap is safe here because we have checked that partition columns are defined
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
|
||||
for column in column_defs {
|
||||
let column_name = &column.name;
|
||||
let data_type = ConcreteDataType::from(
|
||||
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
|
||||
.unwrap(),
|
||||
);
|
||||
column_name_and_type.insert(column_name, data_type);
|
||||
}
|
||||
|
||||
// Transform parser expr to partition expr
|
||||
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
|
||||
for partition in &partitions.exprs {
|
||||
let partition_expr =
|
||||
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
||||
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
||||
}
|
||||
|
||||
// fallback for no expr
|
||||
if partition_exprs.is_empty() {
|
||||
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
||||
}
|
||||
|
||||
partition_exprs
|
||||
} else {
|
||||
vec![vec![PartitionBound::MaxValue]]
|
||||
};
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
|
||||
let columns = if let Some(partitions) = partitions {
|
||||
partitions
|
||||
.column_list
|
||||
.iter()
|
||||
.map(|x| x.value.clone())
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
Ok(columns)
|
||||
}
|
||||
|
||||
fn parse_partition_columns_and_exprs(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
|
||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||
// the partition column, and create only one partition.
|
||||
let partition_columns = find_partition_columns(&partitions)?;
|
||||
let partition_bounds =
|
||||
find_partition_bounds(create_table, &partitions, &partition_columns, query_ctx)?;
|
||||
|
||||
// Validates partition
|
||||
let mut exprs = vec![];
|
||||
for partition in &partition_bounds {
|
||||
for bound in partition {
|
||||
if let PartitionBound::Expr(expr) = bound {
|
||||
exprs.push(expr.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok((partition_columns, partition_bounds, exprs))
|
||||
}
|
||||
|
||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||
pub fn parse_partitions(
|
||||
create_table: &CreateTableExpr,
|
||||
partitions: Option<Partitions>,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
||||
let (partition_columns, partition_bounds, exprs) =
|
||||
parse_partition_columns_and_exprs(create_table, partitions, query_ctx)?;
|
||||
|
||||
// Check if the partition rule is valid
|
||||
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)?;
|
||||
|
||||
Ok((
|
||||
partition_bounds
|
||||
.into_iter()
|
||||
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
||||
.collect::<std::result::Result<_, _>>()?,
|
||||
partition_columns,
|
||||
))
|
||||
}
|
||||
|
||||
// #[cfg(test)]
|
||||
// pub(crate) fn parsr_sql_to_partition_rule(
|
||||
// sql: &str,
|
||||
// ) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
|
||||
// use session::context::QueryContextBuilder;
|
||||
// use sql::dialect::GreptimeDbDialect;
|
||||
// use sql::parser::{ParseOptions, ParserContext};
|
||||
// use sql::statements::statement::Statement;
|
||||
|
||||
// let ctx = QueryContextBuilder::default().build().into();
|
||||
// let result =
|
||||
// ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
// .unwrap();
|
||||
// match &result[0] {
|
||||
// Statement::CreateTable(c) => {
|
||||
// let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
|
||||
// let (partitions, _) = parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
|
||||
// }
|
||||
// _ => unreachable!(),
|
||||
// }
|
||||
|
||||
// parse_partition_columns_and_exprs(create_table, partitions, query_ctx)
|
||||
// }
|
||||
|
||||
fn convert_one_expr(
|
||||
expr: &Expr,
|
||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||
timezone: &Timezone,
|
||||
) -> Result<PartitionExpr> {
|
||||
let Expr::BinaryOp { left, op, right } = expr else {
|
||||
return error::InvalidPartitionRuleSnafu {
|
||||
reason: "partition rule must be a binary expression",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
let op = RestrictedOp::try_from_parser(&op.clone()).with_context(|| {
|
||||
error::InvalidPartitionRuleSnafu {
|
||||
reason: format!("unsupported operator in partition expr {op}"),
|
||||
}
|
||||
})?;
|
||||
|
||||
// convert leaf node.
|
||||
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
|
||||
// col, val
|
||||
(Expr::Identifier(ident), Expr::Value(value)) => {
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(value, data_type, timezone, None)?;
|
||||
(Operand::Column(column_name), op, Operand::Value(value))
|
||||
}
|
||||
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
|
||||
if let Expr::Value(v) = &**expr =>
|
||||
{
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||
(Operand::Column(column_name), op, Operand::Value(value))
|
||||
}
|
||||
// val, col
|
||||
(Expr::Value(value), Expr::Identifier(ident)) => {
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(value, data_type, timezone, None)?;
|
||||
(Operand::Value(value), op, Operand::Column(column_name))
|
||||
}
|
||||
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
|
||||
if let Expr::Value(v) = &**expr =>
|
||||
{
|
||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||
(Operand::Value(value), op, Operand::Column(column_name))
|
||||
}
|
||||
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
|
||||
// sub-expr must against another sub-expr
|
||||
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
|
||||
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
|
||||
(Operand::Expr(lhs), op, Operand::Expr(rhs))
|
||||
}
|
||||
_ => {
|
||||
return error::InvalidPartitionRuleSnafu {
|
||||
reason: format!("invalid partition expr {expr}"),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(PartitionExpr::new(lhs, op, rhs))
|
||||
}
|
||||
|
||||
fn convert_identifier(
|
||||
ident: &Ident,
|
||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||
) -> Result<(String, ConcreteDataType)> {
|
||||
let column_name = ident.value.clone();
|
||||
let data_type = column_name_and_type
|
||||
.get(&column_name)
|
||||
.cloned()
|
||||
.with_context(|| error::ColumnNotFoundSnafu {
|
||||
column: column_name.clone(),
|
||||
})?;
|
||||
Ok((column_name, data_type))
|
||||
}
|
||||
|
||||
fn convert_value(
|
||||
value: &ParserValue,
|
||||
data_type: ConcreteDataType,
|
||||
timezone: &Timezone,
|
||||
unary_op: Option<UnaryOperator>,
|
||||
) -> Result<Value> {
|
||||
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
|
||||
.context(error::ParseSqlValueSnafu)
|
||||
}
|
||||
Reference in New Issue
Block a user