mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
1 Commits
fix-dist-p
...
refactor/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
55e5c7d2ff |
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -2668,24 +2668,6 @@ dependencies = [
|
||||
"strum 0.27.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-sql"
|
||||
version = "0.16.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-datasource",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-time",
|
||||
"datafusion-sql",
|
||||
"datatypes",
|
||||
"hex",
|
||||
"jsonb",
|
||||
"snafu 0.8.5",
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.16.0"
|
||||
@@ -8475,7 +8457,6 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-sql",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -11262,7 +11243,6 @@ dependencies = [
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-session",
|
||||
"common-sql",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -11701,7 +11681,6 @@ dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-query",
|
||||
"common-sql",
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
|
||||
@@ -30,7 +30,6 @@ members = [
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/sql",
|
||||
"src/common/stat",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
@@ -263,7 +262,6 @@ common-query = { path = "src/common/query" }
|
||||
common-recordbatch = { path = "src/common/recordbatch" }
|
||||
common-runtime = { path = "src/common/runtime" }
|
||||
common-session = { path = "src/common/session" }
|
||||
common-sql = { path = "src/common/sql" }
|
||||
common-telemetry = { path = "src/common/telemetry" }
|
||||
common-test-util = { path = "src/common/test-util" }
|
||||
common-time = { path = "src/common/time" }
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
[package]
|
||||
name = "common-sql"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion-sql.workspace = true
|
||||
datatypes.workspace = true
|
||||
hex = "0.4"
|
||||
jsonb.workspace = true
|
||||
snafu.workspace = true
|
||||
sqlparser.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,182 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::timezone::Timezone;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
|
||||
use datatypes::schema::ColumnDefaultConstraint;
|
||||
pub use sqlparser::ast::{
|
||||
visit_expressions_mut, visit_statements_mut, BinaryOperator, ColumnDef, ColumnOption,
|
||||
ColumnOptionDef, DataType, Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments,
|
||||
Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo, UnaryOperator, Value as SqlValue,
|
||||
Visit, VisitMut, Visitor, VisitorMut,
|
||||
};
|
||||
|
||||
use crate::convert::{sql_number_to_value, sql_value_to_value};
|
||||
use crate::error::{Result, UnsupportedDefaultValueSnafu};
|
||||
|
||||
pub fn parse_column_default_constraint(
|
||||
column_name: &str,
|
||||
data_type: &ConcreteDataType,
|
||||
opts: &[ColumnOptionDef],
|
||||
timezone: Option<&Timezone>,
|
||||
) -> Result<Option<ColumnDefaultConstraint>> {
|
||||
if let Some(opt) = opts
|
||||
.iter()
|
||||
.find(|o| matches!(o.option, ColumnOption::Default(_)))
|
||||
{
|
||||
let default_constraint = match &opt.option {
|
||||
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
|
||||
sql_value_to_value(column_name, data_type, v, timezone, None, false)?,
|
||||
),
|
||||
ColumnOption::Default(Expr::Function(func)) => {
|
||||
let mut func = format!("{func}").to_lowercase();
|
||||
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
|
||||
if func == CURRENT_TIMESTAMP {
|
||||
func = CURRENT_TIMESTAMP_FN.to_string();
|
||||
}
|
||||
// Always use lowercase for function expression
|
||||
ColumnDefaultConstraint::Function(func.to_lowercase())
|
||||
}
|
||||
|
||||
ColumnOption::Default(Expr::UnaryOp { op, expr }) => {
|
||||
// Specialized process for handling numerical inputs to prevent
|
||||
// overflow errors during the parsing of negative numbers,
|
||||
// See https://github.com/GreptimeTeam/greptimedb/issues/4351
|
||||
if let (UnaryOperator::Minus, Expr::Value(SqlValue::Number(n, _))) =
|
||||
(op, expr.as_ref())
|
||||
{
|
||||
return Ok(Some(ColumnDefaultConstraint::Value(sql_number_to_value(
|
||||
data_type,
|
||||
&format!("-{n}"),
|
||||
)?)));
|
||||
}
|
||||
|
||||
if let Expr::Value(v) = &**expr {
|
||||
let value =
|
||||
sql_value_to_value(column_name, data_type, v, timezone, Some(*op), false)?;
|
||||
ColumnDefaultConstraint::Value(value)
|
||||
} else {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: *expr.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
ColumnOption::Default(others) => {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: others.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
_ => {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: Expr::Value(SqlValue::Null),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(default_constraint))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use datatypes::types::BooleanType;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_parse_column_default_constraint() {
|
||||
let bool_value = sqlparser::ast::Value::Boolean(true);
|
||||
|
||||
let opts = vec![
|
||||
ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::Value(bool_value)),
|
||||
},
|
||||
ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::NotNull,
|
||||
},
|
||||
];
|
||||
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::Boolean(BooleanType),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
constraint,
|
||||
Some(ColumnDefaultConstraint::Value(Value::Boolean(true)))
|
||||
);
|
||||
|
||||
// Test negative number
|
||||
let opts = vec![ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::UnaryOp {
|
||||
op: UnaryOperator::Minus,
|
||||
expr: Box::new(Expr::Value(SqlValue::Number("32768".to_string(), false))),
|
||||
}),
|
||||
}];
|
||||
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::int16_datatype(),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
constraint,
|
||||
Some(ColumnDefaultConstraint::Value(Value::Int16(-32768)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_incorrect_default_value_issue_3479() {
|
||||
let opts = vec![ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::Value(SqlValue::Number(
|
||||
"0.047318541668048164".into(),
|
||||
false,
|
||||
))),
|
||||
}];
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::float64_datatype(),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!("0.047318541668048164", constraint.to_string());
|
||||
let encoded: Vec<u8> = constraint.clone().try_into().unwrap();
|
||||
let decoded = ColumnDefaultConstraint::try_from(encoded.as_ref()).unwrap();
|
||||
assert_eq!(decoded, constraint);
|
||||
}
|
||||
}
|
||||
@@ -1,158 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_sql::sqlparser::ast::UnaryOperator;
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use snafu::{Location, Snafu};
|
||||
pub use sqlparser::ast::{Expr, Value as SqlValue};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// SQL parser errors.
|
||||
// Now the error in parser does not contain backtrace to avoid generating backtrace
|
||||
// every time the parser parses an invalid SQL.
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Column {} expect type: {:?}, actual: {:?}",
|
||||
column_name,
|
||||
expect,
|
||||
actual,
|
||||
))]
|
||||
ColumnTypeMismatch {
|
||||
column_name: String,
|
||||
expect: ConcreteDataType,
|
||||
actual: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse value: {}", msg))]
|
||||
ParseSqlValue {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unsupported expr in default constraint: {:?} for column: {}",
|
||||
expr,
|
||||
column_name
|
||||
))]
|
||||
UnsupportedDefaultValue {
|
||||
column_name: String,
|
||||
expr: Expr,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
|
||||
ConvertSqlValue {
|
||||
value: SqlValue,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid sql value: {}", value))]
|
||||
InvalidSqlValue {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported unary operator {}", unary_op))]
|
||||
UnsupportedUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
|
||||
InvalidUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
value: Value,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
|
||||
InvalidCast {
|
||||
sql_value: sqlparser::ast::Value,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
|
||||
ConvertStr {
|
||||
value: String,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Converting timestamp {:?} to unit {:?} overflow",
|
||||
timestamp,
|
||||
target_unit
|
||||
))]
|
||||
TimestampOverflow {
|
||||
timestamp: Timestamp,
|
||||
target_unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datatype error: {}", source))]
|
||||
Datatype {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
UnsupportedDefaultValue { .. } => StatusCode::Unsupported,
|
||||
ParseSqlValue { .. } => StatusCode::InvalidSyntax,
|
||||
ColumnTypeMismatch { .. }
|
||||
| InvalidSqlValue { .. }
|
||||
| UnsupportedUnaryOp { .. }
|
||||
| InvalidUnaryOp { .. }
|
||||
| InvalidCast { .. }
|
||||
| ConvertStr { .. }
|
||||
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
|
||||
Datatype { source, .. } => source.status_code(),
|
||||
ConvertSqlValue { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub mod convert;
|
||||
pub mod default_constraint;
|
||||
pub mod error;
|
||||
@@ -86,33 +86,26 @@ impl RegionAliveKeeper {
|
||||
/// Add the countdown task for a specific region.
|
||||
/// It will be ignored if the task exists.
|
||||
pub async fn register_region(&self, region_id: RegionId) {
|
||||
if self.find_handle(region_id).await.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = Arc::new(CountdownTaskHandle::new(
|
||||
self.region_server.clone(),
|
||||
self.countdown_task_handler_ext.clone(),
|
||||
region_id,
|
||||
));
|
||||
|
||||
let should_start = {
|
||||
let mut handles = self.tasks.lock().await;
|
||||
let mut handles = self.tasks.lock().await;
|
||||
let _ = handles.insert(region_id, handle.clone());
|
||||
|
||||
// Check if already exists, return early if so
|
||||
if handles.contains_key(®ion_id) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Insert new handle
|
||||
handles.insert(region_id, handle.clone());
|
||||
|
||||
// Return whether we should start (check state inside lock)
|
||||
self.started.load(Ordering::Relaxed)
|
||||
};
|
||||
|
||||
if should_start {
|
||||
if self.started.load(Ordering::Relaxed) {
|
||||
handle.start(self.heartbeat_interval_millis).await;
|
||||
info!("Region alive countdown for region {region_id} is started!");
|
||||
|
||||
info!("Region alive countdown for region {region_id} is started!",);
|
||||
} else {
|
||||
info!(
|
||||
"Region alive countdown for region {region_id} is registered but not started yet!"
|
||||
"Region alive countdown for region {region_id} is registered but not started yet!",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,6 @@ common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion.workspace = true
|
||||
|
||||
@@ -844,13 +844,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Sql common error"))]
|
||||
SqlCommon {
|
||||
source: common_sql::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -979,7 +972,6 @@ impl ErrorExt for Error {
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::SqlCommon { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements;
|
||||
use sql::statements::insert::Insert;
|
||||
use sqlparser::ast::{ObjectName, Value as SqlValue};
|
||||
use table::metadata::TableInfoRef;
|
||||
@@ -226,7 +227,7 @@ fn sql_value_to_grpc_value(
|
||||
column: column.clone(),
|
||||
})?
|
||||
} else {
|
||||
common_sql::convert::sql_value_to_value(
|
||||
statements::sql_value_to_value(
|
||||
column,
|
||||
&column_schema.data_type,
|
||||
sql_val,
|
||||
@@ -234,7 +235,7 @@ fn sql_value_to_grpc_value(
|
||||
None,
|
||||
auto_string_to_numeric,
|
||||
)
|
||||
.context(crate::error::SqlCommonSnafu)?
|
||||
.context(ParseSqlSnafu)?
|
||||
};
|
||||
|
||||
let grpc_value = value_to_grpc_value(value);
|
||||
|
||||
@@ -19,7 +19,6 @@ use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_query::prelude::TypeSignature;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_telemetry::tracing;
|
||||
use common_time::Timezone;
|
||||
use datatypes::data_type::DataType;
|
||||
@@ -31,6 +30,7 @@ use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
|
||||
use sql::statements::admin::Admin;
|
||||
use sql::statements::sql_value_to_value;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
@@ -186,7 +186,7 @@ fn values_to_vectors_by_exact_types(
|
||||
.zip(exact_types.iter())
|
||||
.map(|(value, data_type)| {
|
||||
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
|
||||
.context(error::SqlCommonSnafu)?;
|
||||
.context(error::ParseSqlValueSnafu)?;
|
||||
|
||||
Ok(value_to_vector(value))
|
||||
})
|
||||
|
||||
@@ -45,7 +45,6 @@ use common_meta::rpc::ddl::{
|
||||
};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
use common_query::Output;
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_telemetry::{debug, info, tracing, warn};
|
||||
use common_time::Timezone;
|
||||
use datafusion_common::tree_node::TreeNodeVisitor;
|
||||
@@ -72,6 +71,7 @@ use sql::statements::create::trigger::CreateTrigger;
|
||||
use sql::statements::create::{
|
||||
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
|
||||
};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
@@ -87,10 +87,10 @@ use crate::error::{
|
||||
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
|
||||
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
|
||||
SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
|
||||
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
||||
ViewAlreadyExistsSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
||||
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
||||
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
use crate::statement::show::create_partitions_stmt;
|
||||
@@ -1859,7 +1859,7 @@ fn convert_value(
|
||||
unary_op,
|
||||
false,
|
||||
)
|
||||
.context(error::SqlCommonSnafu)
|
||||
.context(ParseSqlValueSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -155,23 +155,7 @@ struct PlanRewriter {
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
/// Whether to expand on next call
|
||||
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
|
||||
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
|
||||
/// to be expanded from the parent node of the Aggregate plan.
|
||||
expand_on_next_call: bool,
|
||||
/// Expanding on next partial/conditional/transformed commutative plan
|
||||
/// This is used to handle the case where a plan is transformed, but still
|
||||
/// need to push down as many node as possible before next partial/conditional/transformed commutative
|
||||
/// plan. I.e.
|
||||
/// ```
|
||||
/// Limit:
|
||||
/// Sort:
|
||||
/// ```
|
||||
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
|
||||
/// In this case, we need to expand the `Limit` plan,
|
||||
/// so that we can push down the `Sort` plan as much as possible.
|
||||
expand_on_next_part_cond_trans_commutative: bool,
|
||||
new_child_plan: Option<LogicalPlan>,
|
||||
}
|
||||
|
||||
@@ -193,38 +177,15 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_part_cond_trans_commutative {
|
||||
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
|
||||
match comm {
|
||||
Commutativity::PartialCommutative => {
|
||||
// a small difference is that for partial commutative, we still need to
|
||||
// expand on next call(so `Limit` can be pushed down)
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
self.expand_on_next_call = true;
|
||||
}
|
||||
Commutativity::ConditionalCommutative(_)
|
||||
| Commutativity::TransformedCommutative { .. } => {
|
||||
// for conditional commutative and transformed commutative, we can
|
||||
// expand now
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
return true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
if let Some(plan) = partial_commutative_transformer(plan) {
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -233,7 +194,6 @@ impl PlanRewriter {
|
||||
&& let Some(plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -242,7 +202,7 @@ impl PlanRewriter {
|
||||
&& let Some(transformer_actions) = transformer(plan)
|
||||
{
|
||||
debug!(
|
||||
"PlanRewriter: transformed plan: {:?}\n from {plan}",
|
||||
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
|
||||
transformer_actions.extra_parent_plans
|
||||
);
|
||||
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
|
||||
@@ -266,10 +226,6 @@ impl PlanRewriter {
|
||||
}
|
||||
|
||||
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
|
||||
debug!(
|
||||
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
let mut container = HashSet::new();
|
||||
for expr in plan.expressions() {
|
||||
// this method won't fail
|
||||
@@ -279,10 +235,6 @@ impl PlanRewriter {
|
||||
for col in container {
|
||||
self.column_requirements.insert(col);
|
||||
}
|
||||
debug!(
|
||||
"PlanRewriter: updated column requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
}
|
||||
|
||||
fn is_expanded(&self) -> bool {
|
||||
|
||||
@@ -181,15 +181,6 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
is_batch_coalesced = true;
|
||||
}
|
||||
|
||||
// only a very limited set of plans can exist between region scan and sort exec
|
||||
// other plans might make this optimize wrong, so be safe here by limiting it
|
||||
if !(plan.as_any().is::<ProjectionExec>()
|
||||
|| plan.as_any().is::<FilterExec>()
|
||||
|| plan.as_any().is::<CoalesceBatchesExec>())
|
||||
{
|
||||
partition_ranges = None;
|
||||
}
|
||||
|
||||
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
|
||||
// Collects alias of the time index column.
|
||||
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
|
||||
@@ -203,14 +194,6 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
}
|
||||
|
||||
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
||||
// `PerSeries` distribution is not supported in windowed sort.
|
||||
if region_scan_exec.distribution()
|
||||
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
|
||||
{
|
||||
partition_ranges = None;
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
|
||||
// Reset time index column.
|
||||
time_index = HashSet::from([region_scan_exec.time_index()]);
|
||||
|
||||
@@ -96,10 +96,9 @@ impl PartSortExec {
|
||||
|
||||
if partition >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
"Partition index out of range: {} >= {}",
|
||||
partition,
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
self.partition_ranges.len()
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -323,10 +322,9 @@ impl PartSortStream {
|
||||
) -> datafusion_common::Result<()> {
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
"Partition index out of range: {} >= {}",
|
||||
self.cur_part_idx,
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
self.partition_ranges.len()
|
||||
)?;
|
||||
}
|
||||
let cur_range = self.partition_ranges[self.cur_part_idx];
|
||||
@@ -357,10 +355,9 @@ impl PartSortStream {
|
||||
// check if the current partition index is out of range
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
"Partition index out of range: {} >= {}",
|
||||
self.cur_part_idx,
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
self.partition_ranges.len()
|
||||
)?;
|
||||
}
|
||||
let cur_range = self.partition_ranges[self.cur_part_idx];
|
||||
|
||||
@@ -48,7 +48,6 @@ common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-session.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version = { workspace = true, features = ["codec"] }
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Duration;
|
||||
|
||||
use chrono::NaiveDate;
|
||||
use common_query::prelude::ScalarValue;
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
@@ -28,6 +27,7 @@ use itertools::Itertools;
|
||||
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
|
||||
use snafu::ResultExt;
|
||||
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{self, DataFusionSnafu, Result};
|
||||
|
||||
@@ -19,7 +19,6 @@ common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-query.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
|
||||
@@ -17,13 +17,16 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_sql::sqlparser::ast::UnaryOperator;
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use snafu::{Location, Snafu};
|
||||
use sqlparser::ast::Ident;
|
||||
use sqlparser::parser::ParserError;
|
||||
|
||||
use crate::ast::Expr;
|
||||
use crate::ast::{Expr, Value as SqlValue};
|
||||
use crate::parsers::error::TQLError;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -56,6 +59,18 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unsupported expr in default constraint: {:?} for column: {}",
|
||||
expr,
|
||||
column_name
|
||||
))]
|
||||
UnsupportedDefaultValue {
|
||||
column_name: String,
|
||||
expr: Expr,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
// Syntax error from sql parser.
|
||||
#[snafu(display("Invalid SQL syntax"))]
|
||||
Syntax {
|
||||
@@ -203,6 +218,30 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
|
||||
InvalidCast {
|
||||
sql_value: sqlparser::ast::Value,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
|
||||
InvalidUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
value: Value,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported unary operator {}", unary_op))]
|
||||
UnsupportedUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unrecognized table option key: {}", key))]
|
||||
InvalidTableOption {
|
||||
key: String,
|
||||
@@ -232,6 +271,25 @@ pub enum Error {
|
||||
source: api::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid sql value: {}", value))]
|
||||
InvalidSqlValue {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Converting timestamp {:?} to unit {:?} overflow",
|
||||
timestamp,
|
||||
target_unit
|
||||
))]
|
||||
TimestampOverflow {
|
||||
timestamp: Timestamp,
|
||||
target_unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert statement {} to DataFusion statement", statement))]
|
||||
ConvertToDfStatement {
|
||||
statement: String,
|
||||
@@ -239,6 +297,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
|
||||
ConvertSqlValue {
|
||||
value: SqlValue,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert value {} to sql value", value))]
|
||||
ConvertValue {
|
||||
value: Value,
|
||||
@@ -288,6 +354,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datatype error: {}", source))]
|
||||
Datatype {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid partition number: {}, should be in range [2, 65536]",
|
||||
partition_num
|
||||
@@ -298,6 +371,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
|
||||
ConvertStr {
|
||||
value: String,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[snafu(display("Missing `{}` clause", name))]
|
||||
MissingClause {
|
||||
@@ -329,13 +410,6 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Sql common error"))]
|
||||
SqlCommon {
|
||||
source: common_sql::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -343,7 +417,7 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
Unsupported { .. } => StatusCode::Unsupported,
|
||||
UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported,
|
||||
Unexpected { .. }
|
||||
| Syntax { .. }
|
||||
| TQLSyntax { .. }
|
||||
@@ -367,11 +441,17 @@ impl ErrorExt for Error {
|
||||
| InvalidTableName { .. }
|
||||
| InvalidFlowName { .. }
|
||||
| InvalidFlowQuery { .. }
|
||||
| InvalidSqlValue { .. }
|
||||
| TimestampOverflow { .. }
|
||||
| InvalidTableOption { .. }
|
||||
| InvalidCast { .. }
|
||||
| ConvertToLogicalExpression { .. }
|
||||
| Simplification { .. }
|
||||
| InvalidInterval { .. }
|
||||
| InvalidPartitionNumber { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidUnaryOp { .. }
|
||||
| InvalidPartitionNumber { .. }
|
||||
| UnsupportedUnaryOp { .. }
|
||||
| ConvertStr { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
|
||||
@@ -383,9 +463,9 @@ impl ErrorExt for Error {
|
||||
|
||||
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
|
||||
ConvertToGrpcDataType { source, .. } => source.status_code(),
|
||||
SqlCommon { source, .. } => source.status_code(),
|
||||
Datatype { source, .. } => source.status_code(),
|
||||
ConvertToDfStatement { .. } => StatusCode::Internal,
|
||||
ConvertValue { .. } => StatusCode::Unsupported,
|
||||
ConvertSqlValue { .. } | ConvertValue { .. } => StatusCode::Unsupported,
|
||||
|
||||
PermissionDenied { .. } => StatusCode::PermissionDenied,
|
||||
SetFulltextOption { .. } | SetSkippingIndexOption { .. } => StatusCode::Unexpected,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -233,65 +233,3 @@ DROP TABLE lightning;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
|
||||
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
|
||||
`greptime_value` DOUBLE NULL,
|
||||
`instance` STRING NULL,
|
||||
`job` STRING NULL,
|
||||
TIME INDEX (`greptime_timestamp`),
|
||||
PRIMARY KEY (`instance`, `job`)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO `instance_job_metrics` VALUES
|
||||
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
|
||||
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
|
||||
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
+---------------------+------------------------------------------+
|
||||
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
|
||||
+---------------------+------------------------------------------+
|
||||
| 2023-10-01T00:00:01 | 1696118400.0 |
|
||||
| 2023-10-01T00:00:02 | 3392236800.0 |
|
||||
| 2023-10-01T00:00:03 | 5088355200.0 |
|
||||
| 2023-10-01T00:00:04 | 5088355200.0 |
|
||||
| 2023-10-01T00:00:05 | 5088355200.0 |
|
||||
+---------------------+------------------------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE IF EXISTS `instance_job_metrics`;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -120,30 +120,3 @@ ORDER BY
|
||||
true_collect_time DESC;
|
||||
|
||||
DROP TABLE lightning;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
|
||||
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
|
||||
`greptime_value` DOUBLE NULL,
|
||||
`instance` STRING NULL,
|
||||
`job` STRING NULL,
|
||||
TIME INDEX (`greptime_timestamp`),
|
||||
PRIMARY KEY (`instance`, `job`)
|
||||
);
|
||||
|
||||
INSERT INTO `instance_job_metrics` VALUES
|
||||
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
|
||||
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
|
||||
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
|
||||
|
||||
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
DROP TABLE IF EXISTS `instance_job_metrics`;
|
||||
|
||||
@@ -234,57 +234,3 @@ drop table test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -95,27 +95,3 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
|
||||
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
|
||||
|
||||
drop table test;
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Reference in New Issue
Block a user