mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 22:49:58 +00:00
feat/column-partition:
### Refactor and Enhance Partition Handling - **Refactor Partition Parsing Logic**: Moved partition parsing logic from `src/operator/src/statement/ddl.rs` to a new utility module `src/partition/src/utils.rs`. This includes functions like `parse_partitions`, `find_partition_bounds`, and `convert_one_expr`. - **Error Handling Improvements**: Added new error variants `ColumnNotFound`, `InvalidPartitionRule`, and `ParseSqlValue` in `src/partition/src/error.rs` to improve error reporting for partition-related operations. - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to include new dependencies `common-time` and `session`. - **Code Cleanup**: Removed redundant partition parsing functions from `src/operator/src/error.rs` and `src/operator/src/statement/ddl.rs`.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -8053,6 +8053,7 @@ dependencies = [
|
|||||||
"common-macro",
|
"common-macro",
|
||||||
"common-meta",
|
"common-meta",
|
||||||
"common-query",
|
"common-query",
|
||||||
|
"common-time",
|
||||||
"datafusion-common",
|
"datafusion-common",
|
||||||
"datafusion-expr",
|
"datafusion-expr",
|
||||||
"datafusion-physical-expr",
|
"datafusion-physical-expr",
|
||||||
@@ -8060,6 +8061,7 @@ dependencies = [
|
|||||||
"itertools 0.14.0",
|
"itertools 0.14.0",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"session",
|
||||||
"snafu 0.8.5",
|
"snafu 0.8.5",
|
||||||
"sql",
|
"sql",
|
||||||
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
"sqlparser 0.52.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=71dd86058d2af97b9925093d40c4e03360403170)",
|
||||||
|
|||||||
@@ -420,13 +420,6 @@ pub enum Error {
|
|||||||
source: datatypes::error::Error,
|
source: datatypes::error::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to deserialize partition in meta to partition def"))]
|
|
||||||
DeserializePartition {
|
|
||||||
#[snafu(implicit)]
|
|
||||||
location: Location,
|
|
||||||
source: partition::error::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Failed to describe schema for given statement"))]
|
#[snafu(display("Failed to describe schema for given statement"))]
|
||||||
DescribeStatement {
|
DescribeStatement {
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
@@ -643,7 +636,7 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to parse sql value"))]
|
#[snafu(display("Failed to parse sql value"))]
|
||||||
ParseSqlValue {
|
ParseSqlValue {
|
||||||
source: sql::error::Error,
|
source: sql::error::Error,
|
||||||
@@ -725,12 +718,7 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Invalid partition rule: {}", reason))]
|
|
||||||
InvalidPartitionRule {
|
|
||||||
reason: String,
|
|
||||||
#[snafu(implicit)]
|
|
||||||
location: Location,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Invalid configuration value."))]
|
#[snafu(display("Invalid configuration value."))]
|
||||||
InvalidConfigValue {
|
InvalidConfigValue {
|
||||||
@@ -923,7 +911,6 @@ impl ErrorExt for Error {
|
|||||||
Error::AlterExprToRequest { source, .. } => source.status_code(),
|
Error::AlterExprToRequest { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Error::External { source, .. } => source.status_code(),
|
Error::External { source, .. } => source.status_code(),
|
||||||
Error::DeserializePartition { source, .. }
|
|
||||||
| Error::FindTablePartitionRule { source, .. }
|
| Error::FindTablePartitionRule { source, .. }
|
||||||
| Error::SplitInsert { source, .. }
|
| Error::SplitInsert { source, .. }
|
||||||
| Error::SplitDelete { source, .. }
|
| Error::SplitDelete { source, .. }
|
||||||
@@ -947,7 +934,6 @@ impl ErrorExt for Error {
|
|||||||
Error::ColumnDefaultValue { source, .. } => source.status_code(),
|
Error::ColumnDefaultValue { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Error::EmptyDdlExpr { .. }
|
Error::EmptyDdlExpr { .. }
|
||||||
| Error::InvalidPartitionRule { .. }
|
|
||||||
| Error::ParseSqlValue { .. }
|
| Error::ParseSqlValue { .. }
|
||||||
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,
|
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ use lazy_static::lazy_static;
|
|||||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||||
use partition::multi_dim::MultiDimPartitionRule;
|
use partition::multi_dim::MultiDimPartitionRule;
|
||||||
use partition::partition::{PartitionBound, PartitionDef};
|
use partition::partition::{PartitionBound, PartitionDef};
|
||||||
|
use partition::utils::parse_partitions;
|
||||||
use query::parser::QueryStatement;
|
use query::parser::QueryStatement;
|
||||||
use query::plan::extract_and_rewrite_full_table_names;
|
use query::plan::extract_and_rewrite_full_table_names;
|
||||||
use query::query_engine::DefaultSerializer;
|
use query::query_engine::DefaultSerializer;
|
||||||
@@ -71,8 +72,8 @@ use table::TableRef;
|
|||||||
use super::StatementExecutor;
|
use super::StatementExecutor;
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
|
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
|
||||||
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
|
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||||
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
|
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu,
|
||||||
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
|
InvalidPartitionSnafu, InvalidSqlSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu,
|
||||||
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
|
InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu,
|
||||||
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
|
||||||
@@ -248,7 +249,8 @@ impl StatementExecutor {
|
|||||||
&create_table.table_name,
|
&create_table.table_name,
|
||||||
);
|
);
|
||||||
|
|
||||||
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
|
let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)
|
||||||
|
.context(error::InvalidPartitionSnafu)?;
|
||||||
let mut table_info = create_table_info(create_table, partition_cols)?;
|
let mut table_info = create_table_info(create_table, partition_cols)?;
|
||||||
|
|
||||||
let resp = self
|
let resp = self
|
||||||
@@ -1318,40 +1320,6 @@ impl StatementExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
|
||||||
fn parse_partitions(
|
|
||||||
create_table: &CreateTableExpr,
|
|
||||||
partitions: Option<Partitions>,
|
|
||||||
query_ctx: &QueryContextRef,
|
|
||||||
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
|
||||||
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
|
||||||
// the partition column, and create only one partition.
|
|
||||||
let partition_columns = find_partition_columns(&partitions)?;
|
|
||||||
let partition_entries =
|
|
||||||
find_partition_entries(create_table, &partitions, &partition_columns, query_ctx)?;
|
|
||||||
|
|
||||||
// Validates partition
|
|
||||||
let mut exprs = vec![];
|
|
||||||
for partition in &partition_entries {
|
|
||||||
for bound in partition {
|
|
||||||
if let PartitionBound::Expr(expr) = bound {
|
|
||||||
exprs.push(expr.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)
|
|
||||||
.context(InvalidPartitionSnafu)?;
|
|
||||||
|
|
||||||
Ok((
|
|
||||||
partition_entries
|
|
||||||
.into_iter()
|
|
||||||
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
|
||||||
.collect::<std::result::Result<_, _>>()
|
|
||||||
.context(DeserializePartitionSnafu)?,
|
|
||||||
partition_columns,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_table_info(
|
fn create_table_info(
|
||||||
create_table: &CreateTableExpr,
|
create_table: &CreateTableExpr,
|
||||||
partition_columns: Vec<String>,
|
partition_columns: Vec<String>,
|
||||||
@@ -1438,154 +1406,7 @@ fn create_table_info(
|
|||||||
Ok(table_info)
|
Ok(table_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
|
|
||||||
let columns = if let Some(partitions) = partitions {
|
|
||||||
partitions
|
|
||||||
.column_list
|
|
||||||
.iter()
|
|
||||||
.map(|x| x.value.clone())
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
} else {
|
|
||||||
vec![]
|
|
||||||
};
|
|
||||||
Ok(columns)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse [Partitions] into a group of partition entries.
|
|
||||||
///
|
|
||||||
/// Returns a list of [PartitionBound], each of which defines a partition.
|
|
||||||
fn find_partition_entries(
|
|
||||||
create_table: &CreateTableExpr,
|
|
||||||
partitions: &Option<Partitions>,
|
|
||||||
partition_columns: &[String],
|
|
||||||
query_ctx: &QueryContextRef,
|
|
||||||
) -> Result<Vec<Vec<PartitionBound>>> {
|
|
||||||
let entries = if let Some(partitions) = partitions {
|
|
||||||
// extract concrete data type of partition columns
|
|
||||||
let column_defs = partition_columns
|
|
||||||
.iter()
|
|
||||||
.map(|pc| {
|
|
||||||
create_table
|
|
||||||
.column_defs
|
|
||||||
.iter()
|
|
||||||
.find(|c| &c.name == pc)
|
|
||||||
// unwrap is safe here because we have checked that partition columns are defined
|
|
||||||
.unwrap()
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
|
|
||||||
for column in column_defs {
|
|
||||||
let column_name = &column.name;
|
|
||||||
let data_type = ConcreteDataType::from(
|
|
||||||
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
|
|
||||||
.context(ColumnDataTypeSnafu)?,
|
|
||||||
);
|
|
||||||
column_name_and_type.insert(column_name, data_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transform parser expr to partition expr
|
|
||||||
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
|
|
||||||
for partition in &partitions.exprs {
|
|
||||||
let partition_expr =
|
|
||||||
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
|
||||||
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// fallback for no expr
|
|
||||||
if partition_exprs.is_empty() {
|
|
||||||
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
|
||||||
}
|
|
||||||
|
|
||||||
partition_exprs
|
|
||||||
} else {
|
|
||||||
vec![vec![PartitionBound::MaxValue]]
|
|
||||||
};
|
|
||||||
Ok(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_one_expr(
|
|
||||||
expr: &Expr,
|
|
||||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
|
||||||
timezone: &Timezone,
|
|
||||||
) -> Result<PartitionExpr> {
|
|
||||||
let Expr::BinaryOp { left, op, right } = expr else {
|
|
||||||
return InvalidPartitionRuleSnafu {
|
|
||||||
reason: "partition rule must be a binary expression",
|
|
||||||
}
|
|
||||||
.fail();
|
|
||||||
};
|
|
||||||
|
|
||||||
let op =
|
|
||||||
RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
|
|
||||||
reason: format!("unsupported operator in partition expr {op}"),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// convert leaf node.
|
|
||||||
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
|
|
||||||
// col, val
|
|
||||||
(Expr::Identifier(ident), Expr::Value(value)) => {
|
|
||||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
|
||||||
let value = convert_value(value, data_type, timezone, None)?;
|
|
||||||
(Operand::Column(column_name), op, Operand::Value(value))
|
|
||||||
}
|
|
||||||
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
|
|
||||||
if let Expr::Value(v) = &**expr =>
|
|
||||||
{
|
|
||||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
|
||||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
|
||||||
(Operand::Column(column_name), op, Operand::Value(value))
|
|
||||||
}
|
|
||||||
// val, col
|
|
||||||
(Expr::Value(value), Expr::Identifier(ident)) => {
|
|
||||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
|
||||||
let value = convert_value(value, data_type, timezone, None)?;
|
|
||||||
(Operand::Value(value), op, Operand::Column(column_name))
|
|
||||||
}
|
|
||||||
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
|
|
||||||
if let Expr::Value(v) = &**expr =>
|
|
||||||
{
|
|
||||||
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
|
||||||
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
|
||||||
(Operand::Value(value), op, Operand::Column(column_name))
|
|
||||||
}
|
|
||||||
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
|
|
||||||
// sub-expr must against another sub-expr
|
|
||||||
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
|
|
||||||
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
|
|
||||||
(Operand::Expr(lhs), op, Operand::Expr(rhs))
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return InvalidPartitionRuleSnafu {
|
|
||||||
reason: format!("invalid partition expr {expr}"),
|
|
||||||
}
|
|
||||||
.fail();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(PartitionExpr::new(lhs, op, rhs))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_identifier(
|
|
||||||
ident: &Ident,
|
|
||||||
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
|
||||||
) -> Result<(String, ConcreteDataType)> {
|
|
||||||
let column_name = ident.value.clone();
|
|
||||||
let data_type = column_name_and_type
|
|
||||||
.get(&column_name)
|
|
||||||
.cloned()
|
|
||||||
.with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
|
|
||||||
Ok((column_name, data_type))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_value(
|
|
||||||
value: &ParserValue,
|
|
||||||
data_type: ConcreteDataType,
|
|
||||||
timezone: &Timezone,
|
|
||||||
unary_op: Option<UnaryOperator>,
|
|
||||||
) -> Result<Value> {
|
|
||||||
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
|
|
||||||
.context(ParseSqlValueSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ common-error.workspace = true
|
|||||||
common-macro.workspace = true
|
common-macro.workspace = true
|
||||||
common-meta.workspace = true
|
common-meta.workspace = true
|
||||||
common-query.workspace = true
|
common-query.workspace = true
|
||||||
|
common-time.workspace = true
|
||||||
|
session.workspace = true
|
||||||
datafusion-common.workspace = true
|
datafusion-common.workspace = true
|
||||||
datafusion-expr.workspace = true
|
datafusion-expr.workspace = true
|
||||||
datafusion-physical-expr.workspace = true
|
datafusion-physical-expr.workspace = true
|
||||||
|
|||||||
@@ -180,6 +180,26 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Column not found: {}", column))]
|
||||||
|
ColumnNotFound {
|
||||||
|
column: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Invalid partition rule: {}", reason))]
|
||||||
|
InvalidPartitionRule {
|
||||||
|
reason: String,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
#[snafu(display("Failed to parse sql value"))]
|
||||||
|
ParseSqlValue {
|
||||||
|
source: sql::error::Error,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ErrorExt for Error {
|
impl ErrorExt for Error {
|
||||||
@@ -204,6 +224,9 @@ impl ErrorExt for Error {
|
|||||||
Error::TableRouteManager { source, .. } => source.status_code(),
|
Error::TableRouteManager { source, .. } => source.status_code(),
|
||||||
Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
|
Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
|
||||||
Error::ConvertToVector { source, .. } => source.status_code(),
|
Error::ConvertToVector { source, .. } => source.status_code(),
|
||||||
|
Error::ColumnNotFound { .. } => StatusCode::InvalidArguments,
|
||||||
|
Error::InvalidPartitionRule { .. } => StatusCode::InvalidArguments,
|
||||||
|
Error::ParseSqlValue { source, .. } => source.status_code(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,7 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
|
#![feature(if_let_guard)]
|
||||||
|
|
||||||
//! Structs and traits for partitioning rule.
|
//! Structs and traits for partitioning rule.
|
||||||
|
|
||||||
@@ -22,5 +23,6 @@ pub mod manager;
|
|||||||
pub mod multi_dim;
|
pub mod multi_dim;
|
||||||
pub mod partition;
|
pub mod partition;
|
||||||
pub mod splitter;
|
pub mod splitter;
|
||||||
|
pub mod utils;
|
||||||
|
|
||||||
pub use crate::partition::{PartitionRule, PartitionRuleRef};
|
pub use crate::partition::{PartitionRule, PartitionRuleRef};
|
||||||
|
|||||||
238
src/partition/src/utils.rs
Normal file
238
src/partition/src/utils.rs
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
use api::helper::ColumnDataTypeWrapper;
|
||||||
|
use api::v1::{column_def, CreateTableExpr};
|
||||||
|
use common_meta::rpc::router::Partition as MetaPartition;
|
||||||
|
use common_time::Timezone;
|
||||||
|
use datafusion_common::HashMap;
|
||||||
|
use datatypes::prelude::ConcreteDataType;
|
||||||
|
use datatypes::value::Value;
|
||||||
|
use session::context::QueryContextRef;
|
||||||
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
use sql::error::ParseSqlValueSnafu;
|
||||||
|
use sql::statements::create::Partitions;
|
||||||
|
use sql::statements::sql_value_to_value;
|
||||||
|
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||||
|
|
||||||
|
use crate::error::{self, Result};
|
||||||
|
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||||
|
use crate::multi_dim::MultiDimPartitionRule;
|
||||||
|
use crate::partition::{PartitionBound, PartitionDef};
|
||||||
|
|
||||||
|
/// Parse [Partitions] into a group of partition entries.
|
||||||
|
///
|
||||||
|
/// Returns a list of [PartitionBound], each of which defines a partition.
|
||||||
|
fn find_partition_bounds(
|
||||||
|
create_table: &CreateTableExpr,
|
||||||
|
partitions: &Option<Partitions>,
|
||||||
|
partition_columns: &[String],
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<Vec<Vec<PartitionBound>>> {
|
||||||
|
let entries = if let Some(partitions) = partitions {
|
||||||
|
// extract concrete data type of partition columns
|
||||||
|
let column_defs = partition_columns
|
||||||
|
.iter()
|
||||||
|
.map(|pc| {
|
||||||
|
create_table
|
||||||
|
.column_defs
|
||||||
|
.iter()
|
||||||
|
.find(|c| &c.name == pc)
|
||||||
|
// unwrap is safe here because we have checked that partition columns are defined
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
|
||||||
|
for column in column_defs {
|
||||||
|
let column_name = &column.name;
|
||||||
|
let data_type = ConcreteDataType::from(
|
||||||
|
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
|
column_name_and_type.insert(column_name, data_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transform parser expr to partition expr
|
||||||
|
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
|
||||||
|
for partition in &partitions.exprs {
|
||||||
|
let partition_expr =
|
||||||
|
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
|
||||||
|
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback for no expr
|
||||||
|
if partition_exprs.is_empty() {
|
||||||
|
partition_exprs.push(vec![PartitionBound::MaxValue]);
|
||||||
|
}
|
||||||
|
|
||||||
|
partition_exprs
|
||||||
|
} else {
|
||||||
|
vec![vec![PartitionBound::MaxValue]]
|
||||||
|
};
|
||||||
|
Ok(entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>> {
|
||||||
|
let columns = if let Some(partitions) = partitions {
|
||||||
|
partitions
|
||||||
|
.column_list
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.value.clone())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
} else {
|
||||||
|
vec![]
|
||||||
|
};
|
||||||
|
Ok(columns)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_partition_columns_and_exprs(
|
||||||
|
create_table: &CreateTableExpr,
|
||||||
|
partitions: Option<Partitions>,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
|
||||||
|
// If partitions are not defined by user, use the timestamp column (which has to be existed) as
|
||||||
|
// the partition column, and create only one partition.
|
||||||
|
let partition_columns = find_partition_columns(&partitions)?;
|
||||||
|
let partition_bounds =
|
||||||
|
find_partition_bounds(create_table, &partitions, &partition_columns, query_ctx)?;
|
||||||
|
|
||||||
|
// Validates partition
|
||||||
|
let mut exprs = vec![];
|
||||||
|
for partition in &partition_bounds {
|
||||||
|
for bound in partition {
|
||||||
|
if let PartitionBound::Expr(expr) = bound {
|
||||||
|
exprs.push(expr.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((partition_columns, partition_bounds, exprs))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
|
||||||
|
pub fn parse_partitions(
|
||||||
|
create_table: &CreateTableExpr,
|
||||||
|
partitions: Option<Partitions>,
|
||||||
|
query_ctx: &QueryContextRef,
|
||||||
|
) -> Result<(Vec<MetaPartition>, Vec<String>)> {
|
||||||
|
let (partition_columns, partition_bounds, exprs) =
|
||||||
|
parse_partition_columns_and_exprs(create_table, partitions, query_ctx)?;
|
||||||
|
|
||||||
|
// Check if the partition rule is valid
|
||||||
|
MultiDimPartitionRule::try_new(partition_columns.clone(), vec![], exprs)?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
partition_bounds
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| MetaPartition::try_from(PartitionDef::new(partition_columns.clone(), x)))
|
||||||
|
.collect::<std::result::Result<_, _>>()?,
|
||||||
|
partition_columns,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[cfg(test)]
|
||||||
|
// pub(crate) fn parsr_sql_to_partition_rule(
|
||||||
|
// sql: &str,
|
||||||
|
// ) -> Result<(Vec<String>, Vec<Vec<PartitionBound>>, Vec<PartitionExpr>)> {
|
||||||
|
// use session::context::QueryContextBuilder;
|
||||||
|
// use sql::dialect::GreptimeDbDialect;
|
||||||
|
// use sql::parser::{ParseOptions, ParserContext};
|
||||||
|
// use sql::statements::statement::Statement;
|
||||||
|
|
||||||
|
// let ctx = QueryContextBuilder::default().build().into();
|
||||||
|
// let result =
|
||||||
|
// ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||||
|
// .unwrap();
|
||||||
|
// match &result[0] {
|
||||||
|
// Statement::CreateTable(c) => {
|
||||||
|
// let expr = expr_helper::create_to_expr(c, &QueryContext::arc()).unwrap();
|
||||||
|
// let (partitions, _) = parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap();
|
||||||
|
// }
|
||||||
|
// _ => unreachable!(),
|
||||||
|
// }
|
||||||
|
|
||||||
|
// parse_partition_columns_and_exprs(create_table, partitions, query_ctx)
|
||||||
|
// }
|
||||||
|
|
||||||
|
fn convert_one_expr(
|
||||||
|
expr: &Expr,
|
||||||
|
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||||
|
timezone: &Timezone,
|
||||||
|
) -> Result<PartitionExpr> {
|
||||||
|
let Expr::BinaryOp { left, op, right } = expr else {
|
||||||
|
return error::InvalidPartitionRuleSnafu {
|
||||||
|
reason: "partition rule must be a binary expression",
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
};
|
||||||
|
|
||||||
|
let op = RestrictedOp::try_from_parser(&op.clone()).with_context(|| {
|
||||||
|
error::InvalidPartitionRuleSnafu {
|
||||||
|
reason: format!("unsupported operator in partition expr {op}"),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// convert leaf node.
|
||||||
|
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
|
||||||
|
// col, val
|
||||||
|
(Expr::Identifier(ident), Expr::Value(value)) => {
|
||||||
|
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||||
|
let value = convert_value(value, data_type, timezone, None)?;
|
||||||
|
(Operand::Column(column_name), op, Operand::Value(value))
|
||||||
|
}
|
||||||
|
(Expr::Identifier(ident), Expr::UnaryOp { op: unary_op, expr })
|
||||||
|
if let Expr::Value(v) = &**expr =>
|
||||||
|
{
|
||||||
|
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||||
|
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||||
|
(Operand::Column(column_name), op, Operand::Value(value))
|
||||||
|
}
|
||||||
|
// val, col
|
||||||
|
(Expr::Value(value), Expr::Identifier(ident)) => {
|
||||||
|
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||||
|
let value = convert_value(value, data_type, timezone, None)?;
|
||||||
|
(Operand::Value(value), op, Operand::Column(column_name))
|
||||||
|
}
|
||||||
|
(Expr::UnaryOp { op: unary_op, expr }, Expr::Identifier(ident))
|
||||||
|
if let Expr::Value(v) = &**expr =>
|
||||||
|
{
|
||||||
|
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
|
||||||
|
let value = convert_value(v, data_type, timezone, Some(*unary_op))?;
|
||||||
|
(Operand::Value(value), op, Operand::Column(column_name))
|
||||||
|
}
|
||||||
|
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
|
||||||
|
// sub-expr must against another sub-expr
|
||||||
|
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
|
||||||
|
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
|
||||||
|
(Operand::Expr(lhs), op, Operand::Expr(rhs))
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return error::InvalidPartitionRuleSnafu {
|
||||||
|
reason: format!("invalid partition expr {expr}"),
|
||||||
|
}
|
||||||
|
.fail();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(PartitionExpr::new(lhs, op, rhs))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn convert_identifier(
|
||||||
|
ident: &Ident,
|
||||||
|
column_name_and_type: &HashMap<&String, ConcreteDataType>,
|
||||||
|
) -> Result<(String, ConcreteDataType)> {
|
||||||
|
let column_name = ident.value.clone();
|
||||||
|
let data_type = column_name_and_type
|
||||||
|
.get(&column_name)
|
||||||
|
.cloned()
|
||||||
|
.with_context(|| error::ColumnNotFoundSnafu {
|
||||||
|
column: column_name.clone(),
|
||||||
|
})?;
|
||||||
|
Ok((column_name, data_type))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn convert_value(
|
||||||
|
value: &ParserValue,
|
||||||
|
data_type: ConcreteDataType,
|
||||||
|
timezone: &Timezone,
|
||||||
|
unary_op: Option<UnaryOperator>,
|
||||||
|
) -> Result<Value> {
|
||||||
|
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone), unary_op)
|
||||||
|
.context(error::ParseSqlValueSnafu)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user