Compare commits

..

15 Commits

Author SHA1 Message Date
discord9
365421c452 Revert "feat: stream drop record metrics"
This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
b361c5f50a chore: more dbg
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
5d78bc1efa test: update sqlness
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
99c78b2f97 fix: expand differently
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
bdecdb869e feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
df4cd157e1 Revert "feat: stream drop record metrics"
This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
60fbe54f90 feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
7c15e71407 revert
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
13f20b5a40 add logging to figure test failure
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
2b802e45f5 update sqlness result
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
def9b7c01d fix: expand on conditional commutative as well
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
af03e89139 fix: stricter win sort condition (#6477)
test: sqlness



test: fix sqlness redacted

Signed-off-by: discord9 <discord9@163.com>
2025-07-08 22:27:17 +00:00
jeremyhi
e7a64b7dc0 chore: refactor register_region method to avoid TOCTOU issues (#6468) 2025-07-08 13:26:38 +00:00
Lin Yihai
29739b556e refactor: split some convert function into sql-common crate (#6452)
refactor: split some convert function into `sql-common` crates

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>
2025-07-08 12:08:33 +00:00
Lei, HUANG
77e50d0e08 chore: expose some config (#6479)
refactor/expose-config:
 ### Make SubCommand and Fields Public in `frontend.rs`

 - Made `subcmd` field in `Command` struct public.
 - Made `SubCommand` enum public.
 - Made `config_file` and `env_prefix` fields in `StartCommand` struct public.

 These changes enhance the accessibility of command-related structures and fields, facilitating external usage and integration.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-07-08 11:52:23 +00:00
25 changed files with 1794 additions and 1330 deletions

21
Cargo.lock generated
View File

@@ -2668,6 +2668,24 @@ dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-sql"
version = "0.16.0"
dependencies = [
"common-base",
"common-datasource",
"common-decimal",
"common-error",
"common-macro",
"common-time",
"datafusion-sql",
"datatypes",
"hex",
"jsonb",
"snafu 0.8.5",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
]
[[package]]
name = "common-telemetry"
version = "0.16.0"
@@ -8457,6 +8475,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-runtime",
"common-sql",
"common-telemetry",
"common-test-util",
"common-time",
@@ -11243,6 +11262,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-session",
"common-sql",
"common-telemetry",
"common-test-util",
"common-time",
@@ -11681,6 +11701,7 @@ dependencies = [
"common-error",
"common-macro",
"common-query",
"common-sql",
"common-time",
"datafusion",
"datafusion-common",

View File

@@ -30,6 +30,7 @@ members = [
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/sql",
"src/common/stat",
"src/common/substrait",
"src/common/telemetry",
@@ -262,6 +263,7 @@ common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
common-session = { path = "src/common/session" }
common-sql = { path = "src/common/sql" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }

22
src/common/sql/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "common-sql"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-base.workspace = true
common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-time.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true
hex = "0.4"
jsonb.workspace = true
snafu.workspace = true
sqlparser.workspace = true
[lints]
workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,182 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::ColumnDefaultConstraint;
pub use sqlparser::ast::{
visit_expressions_mut, visit_statements_mut, BinaryOperator, ColumnDef, ColumnOption,
ColumnOptionDef, DataType, Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments,
Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo, UnaryOperator, Value as SqlValue,
Visit, VisitMut, Visitor, VisitorMut,
};
use crate::convert::{sql_number_to_value, sql_value_to_value};
use crate::error::{Result, UnsupportedDefaultValueSnafu};
pub fn parse_column_default_constraint(
column_name: &str,
data_type: &ConcreteDataType,
opts: &[ColumnOptionDef],
timezone: Option<&Timezone>,
) -> Result<Option<ColumnDefaultConstraint>> {
if let Some(opt) = opts
.iter()
.find(|o| matches!(o.option, ColumnOption::Default(_)))
{
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, v, timezone, None, false)?,
),
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
if func == CURRENT_TIMESTAMP {
func = CURRENT_TIMESTAMP_FN.to_string();
}
// Always use lowercase for function expression
ColumnDefaultConstraint::Function(func.to_lowercase())
}
ColumnOption::Default(Expr::UnaryOp { op, expr }) => {
// Specialized process for handling numerical inputs to prevent
// overflow errors during the parsing of negative numbers,
// See https://github.com/GreptimeTeam/greptimedb/issues/4351
if let (UnaryOperator::Minus, Expr::Value(SqlValue::Number(n, _))) =
(op, expr.as_ref())
{
return Ok(Some(ColumnDefaultConstraint::Value(sql_number_to_value(
data_type,
&format!("-{n}"),
)?)));
}
if let Expr::Value(v) = &**expr {
let value =
sql_value_to_value(column_name, data_type, v, timezone, Some(*op), false)?;
ColumnDefaultConstraint::Value(value)
} else {
return UnsupportedDefaultValueSnafu {
column_name,
expr: *expr.clone(),
}
.fail();
}
}
ColumnOption::Default(others) => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: others.clone(),
}
.fail();
}
_ => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: Expr::Value(SqlValue::Null),
}
.fail();
}
};
Ok(Some(default_constraint))
} else {
Ok(None)
}
}
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::types::BooleanType;
use super::*;
#[test]
pub fn test_parse_column_default_constraint() {
let bool_value = sqlparser::ast::Value::Boolean(true);
let opts = vec![
ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(bool_value)),
},
ColumnOptionDef {
name: None,
option: ColumnOption::NotNull,
},
];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::Boolean(BooleanType),
&opts,
None,
)
.unwrap();
assert_matches!(
constraint,
Some(ColumnDefaultConstraint::Value(Value::Boolean(true)))
);
// Test negative number
let opts = vec![ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::UnaryOp {
op: UnaryOperator::Minus,
expr: Box::new(Expr::Value(SqlValue::Number("32768".to_string(), false))),
}),
}];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::int16_datatype(),
&opts,
None,
)
.unwrap();
assert_matches!(
constraint,
Some(ColumnDefaultConstraint::Value(Value::Int16(-32768)))
);
}
#[test]
fn test_incorrect_default_value_issue_3479() {
let opts = vec![ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(SqlValue::Number(
"0.047318541668048164".into(),
false,
))),
}];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::float64_datatype(),
&opts,
None,
)
.unwrap()
.unwrap();
assert_eq!("0.047318541668048164", constraint.to_string());
let encoded: Vec<u8> = constraint.clone().try_into().unwrap();
let decoded = ColumnDefaultConstraint::try_from(encoded.as_ref()).unwrap();
assert_eq!(decoded, constraint);
}
}

158
src/common/sql/src/error.rs Normal file
View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_sql::sqlparser::ast::UnaryOperator;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
pub use sqlparser::ast::{Expr, Value as SqlValue};
pub type Result<T> = std::result::Result<T, Error>;
/// SQL parser errors.
// Now the error in parser does not contain backtrace to avoid generating backtrace
// every time the parser parses an invalid SQL.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Column {} expect type: {:?}, actual: {:?}",
column_name,
expect,
actual,
))]
ColumnTypeMismatch {
column_name: String,
expect: ConcreteDataType,
actual: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse value: {}", msg))]
ParseSqlValue {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Unsupported expr in default constraint: {:?} for column: {}",
expr,
column_name
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
ConvertSqlValue {
value: SqlValue,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported unary operator {}", unary_op))]
UnsupportedUnaryOp {
unary_op: UnaryOperator,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
InvalidUnaryOp {
unary_op: UnaryOperator,
value: Value,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
InvalidCast {
sql_value: sqlparser::ast::Value,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
ConvertStr {
value: String,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Converting timestamp {:?} to unit {:?} overflow",
timestamp,
target_unit
))]
TimestampOverflow {
timestamp: Timestamp,
target_unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datatype error: {}", source))]
Datatype {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
UnsupportedDefaultValue { .. } => StatusCode::Unsupported,
ParseSqlValue { .. } => StatusCode::InvalidSyntax,
ColumnTypeMismatch { .. }
| InvalidSqlValue { .. }
| UnsupportedUnaryOp { .. }
| InvalidUnaryOp { .. }
| InvalidCast { .. }
| ConvertStr { .. }
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
Datatype { source, .. } => source.status_code(),
ConvertSqlValue { .. } => StatusCode::Unsupported,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

19
src/common/sql/src/lib.rs Normal file
View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
pub mod convert;
pub mod default_constraint;
pub mod error;

View File

@@ -86,26 +86,33 @@ impl RegionAliveKeeper {
/// Add the countdown task for a specific region.
/// It will be ignored if the task exists.
pub async fn register_region(&self, region_id: RegionId) {
if self.find_handle(region_id).await.is_some() {
return;
}
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
self.countdown_task_handler_ext.clone(),
region_id,
));
let mut handles = self.tasks.lock().await;
let _ = handles.insert(region_id, handle.clone());
let should_start = {
let mut handles = self.tasks.lock().await;
if self.started.load(Ordering::Relaxed) {
// Check if already exists, return early if so
if handles.contains_key(&region_id) {
return;
}
// Insert new handle
handles.insert(region_id, handle.clone());
// Return whether we should start (check state inside lock)
self.started.load(Ordering::Relaxed)
};
if should_start {
handle.start(self.heartbeat_interval_millis).await;
info!("Region alive countdown for region {region_id} is started!",);
info!("Region alive countdown for region {region_id} is started!");
} else {
info!(
"Region alive countdown for region {region_id} is registered but not started yet!",
"Region alive countdown for region {region_id} is registered but not started yet!"
);
}
}

View File

@@ -35,6 +35,7 @@ common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-sql.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true

View File

@@ -844,6 +844,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Sql common error"))]
SqlCommon {
source: common_sql::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -972,6 +979,7 @@ impl ErrorExt for Error {
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
Error::SqlCommon { source, .. } => source.status_code(),
}
}

View File

@@ -22,7 +22,6 @@ use datatypes::schema::{ColumnSchema, SchemaRef};
use partition::manager::PartitionRuleManager;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::metadata::TableInfoRef;
@@ -227,7 +226,7 @@ fn sql_value_to_grpc_value(
column: column.clone(),
})?
} else {
statements::sql_value_to_value(
common_sql::convert::sql_value_to_value(
column,
&column_schema.data_type,
sql_val,
@@ -235,7 +234,7 @@ fn sql_value_to_grpc_value(
None,
auto_string_to_numeric,
)
.context(ParseSqlSnafu)?
.context(crate::error::SqlCommonSnafu)?
};
let grpc_value = value_to_grpc_value(value);

View File

@@ -19,6 +19,7 @@ use common_function::function_registry::FUNCTION_REGISTRY;
use common_query::prelude::TypeSignature;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_sql::convert::sql_value_to_value;
use common_telemetry::tracing;
use common_time::Timezone;
use datatypes::data_type::DataType;
@@ -30,7 +31,6 @@ use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
use sql::statements::admin::Admin;
use sql::statements::sql_value_to_value;
use crate::error::{self, Result};
use crate::statement::StatementExecutor;
@@ -186,7 +186,7 @@ fn values_to_vectors_by_exact_types(
.zip(exact_types.iter())
.map(|(value, data_type)| {
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
.context(error::ParseSqlValueSnafu)?;
.context(error::SqlCommonSnafu)?;
Ok(value_to_vector(value))
})

View File

@@ -45,6 +45,7 @@ use common_meta::rpc::ddl::{
};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_query::Output;
use common_sql::convert::sql_value_to_value;
use common_telemetry::{debug, info, tracing, warn};
use common_time::Timezone;
use datafusion_common::tree_node::TreeNodeVisitor;
@@ -71,7 +72,6 @@ use sql::statements::create::trigger::CreateTrigger;
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
@@ -87,10 +87,10 @@ use crate::error::{
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
ViewAlreadyExistsSnafu,
};
use crate::expr_helper;
use crate::statement::show::create_partitions_stmt;
@@ -1859,7 +1859,7 @@ fn convert_value(
unary_op,
false,
)
.context(ParseSqlValueSnafu)
.context(error::SqlCommonSnafu)
}
#[cfg(test)]

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,15 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -194,6 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -202,7 +242,7 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
@@ -226,6 +266,10 @@ impl PlanRewriter {
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
for expr in plan.expressions() {
// this method won't fail
@@ -235,6 +279,10 @@ impl PlanRewriter {
for col in container {
self.column_requirements.insert(col);
}
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
);
}
fn is_expanded(&self) -> bool {

View File

@@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}
// only a very limited set of plans can exist between region scan and sort exec
// other plans might make this optimize wrong, so be safe here by limiting it
if !(plan.as_any().is::<ProjectionExec>()
|| plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalesceBatchesExec>())
{
partition_ranges = None;
}
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
@@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
// `PerSeries` distribution is not supported in windowed sort.
if region_scan_exec.distribution()
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
{
partition_ranges = None;
return Ok(Transformed::no(plan));
}
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);

View File

@@ -96,9 +96,10 @@ impl PartSortExec {
if partition >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
partition,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
@@ -322,9 +323,10 @@ impl PartSortStream {
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
@@ -355,9 +357,10 @@ impl PartSortStream {
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];

View File

@@ -48,6 +48,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-session.workspace = true
common-sql.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version = { workspace = true, features = ["codec"] }

View File

@@ -17,6 +17,7 @@ use std::time::Duration;
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use common_sql::convert::sql_value_to_value;
use common_time::Timestamp;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
@@ -27,7 +28,6 @@ use itertools::Itertools;
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
use snafu::ResultExt;
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use crate::error::{self, DataFusionSnafu, Result};

View File

@@ -19,6 +19,7 @@ common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-sql.workspace = true
common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true

View File

@@ -17,16 +17,13 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::DataFusionError;
use datafusion_sql::sqlparser::ast::UnaryOperator;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
use sqlparser::ast::Ident;
use sqlparser::parser::ParserError;
use crate::ast::{Expr, Value as SqlValue};
use crate::ast::Expr;
use crate::parsers::error::TQLError;
pub type Result<T> = std::result::Result<T, Error>;
@@ -59,18 +56,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Unsupported expr in default constraint: {:?} for column: {}",
expr,
column_name
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
#[snafu(implicit)]
location: Location,
},
// Syntax error from sql parser.
#[snafu(display("Invalid SQL syntax"))]
Syntax {
@@ -218,30 +203,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
InvalidCast {
sql_value: sqlparser::ast::Value,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
InvalidUnaryOp {
unary_op: UnaryOperator,
value: Value,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported unary operator {}", unary_op))]
UnsupportedUnaryOp {
unary_op: UnaryOperator,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unrecognized table option key: {}", key))]
InvalidTableOption {
key: String,
@@ -271,25 +232,6 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Converting timestamp {:?} to unit {:?} overflow",
timestamp,
target_unit
))]
TimestampOverflow {
timestamp: Timestamp,
target_unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert statement {} to DataFusion statement", statement))]
ConvertToDfStatement {
statement: String,
@@ -297,14 +239,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
ConvertSqlValue {
value: SqlValue,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert value {} to sql value", value))]
ConvertValue {
value: Value,
@@ -354,13 +288,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Datatype error: {}", source))]
Datatype {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Invalid partition number: {}, should be in range [2, 65536]",
partition_num
@@ -371,14 +298,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
ConvertStr {
value: String,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Missing `{}` clause", name))]
MissingClause {
@@ -410,6 +329,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Sql common error"))]
SqlCommon {
source: common_sql::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -417,7 +343,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported,
Unsupported { .. } => StatusCode::Unsupported,
Unexpected { .. }
| Syntax { .. }
| TQLSyntax { .. }
@@ -441,17 +367,11 @@ impl ErrorExt for Error {
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidFlowQuery { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
| InvalidCast { .. }
| ConvertToLogicalExpression { .. }
| Simplification { .. }
| InvalidInterval { .. }
| InvalidUnaryOp { .. }
| InvalidPartitionNumber { .. }
| UnsupportedUnaryOp { .. }
| ConvertStr { .. } => StatusCode::InvalidArguments,
| InvalidPartitionNumber { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "enterprise")]
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
@@ -463,9 +383,9 @@ impl ErrorExt for Error {
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
Datatype { source, .. } => source.status_code(),
SqlCommon { source, .. } => source.status_code(),
ConvertToDfStatement { .. } => StatusCode::Internal,
ConvertSqlValue { .. } | ConvertValue { .. } => StatusCode::Unsupported,
ConvertValue { .. } => StatusCode::Unsupported,
PermissionDenied { .. } => StatusCode::PermissionDenied,
SetFulltextOption { .. } | SetSkippingIndexOption { .. } => StatusCode::Unexpected,

File diff suppressed because it is too large Load Diff

View File

@@ -233,3 +233,65 @@ DROP TABLE lightning;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
Affected Rows: 0
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
Affected Rows: 3
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+---------------------+------------------------------------------+
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
+---------------------+------------------------------------------+
| 2023-10-01T00:00:01 | 1696118400.0 |
| 2023-10-01T00:00:02 | 3392236800.0 |
| 2023-10-01T00:00:03 | 5088355200.0 |
| 2023-10-01T00:00:04 | 5088355200.0 |
| 2023-10-01T00:00:05 | 5088355200.0 |
+---------------------+------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
DROP TABLE IF EXISTS `instance_job_metrics`;
Affected Rows: 0

View File

@@ -120,3 +120,30 @@ ORDER BY
true_collect_time DESC;
DROP TABLE lightning;
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
DROP TABLE IF EXISTS `instance_job_metrics`;

View File

@@ -234,3 +234,57 @@ drop table test;
Affected Rows: 0
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
Affected Rows: 0
TQL EVAL sum(test2);
++
++
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE test2;
Affected Rows: 0

View File

@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
drop table test;
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
TQL EVAL sum(test2);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
DROP TABLE test2;