mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
Compare commits
19 Commits
v0.16.0-ni
...
fix-dist-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
365421c452 | ||
|
|
b361c5f50a | ||
|
|
5d78bc1efa | ||
|
|
99c78b2f97 | ||
|
|
bdecdb869e | ||
|
|
df4cd157e1 | ||
|
|
60fbe54f90 | ||
|
|
7c15e71407 | ||
|
|
13f20b5a40 | ||
|
|
2b802e45f5 | ||
|
|
def9b7c01d | ||
|
|
af03e89139 | ||
|
|
e7a64b7dc0 | ||
|
|
29739b556e | ||
|
|
77e50d0e08 | ||
|
|
c2f1447345 | ||
|
|
30f7955d2b | ||
|
|
3508fddd74 | ||
|
|
351c741c70 |
37
Cargo.lock
generated
37
Cargo.lock
generated
@@ -1975,7 +1975,6 @@ dependencies = [
|
||||
"common-version",
|
||||
"common-wal",
|
||||
"datatypes",
|
||||
"either",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
"humantime",
|
||||
@@ -2103,7 +2102,6 @@ dependencies = [
|
||||
"common-wal",
|
||||
"datanode",
|
||||
"datatypes",
|
||||
"either",
|
||||
"etcd-client",
|
||||
"file-engine",
|
||||
"flow",
|
||||
@@ -2670,6 +2668,24 @@ dependencies = [
|
||||
"strum 0.27.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-sql"
|
||||
version = "0.16.0"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-datasource",
|
||||
"common-decimal",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-time",
|
||||
"datafusion-sql",
|
||||
"datatypes",
|
||||
"hex",
|
||||
"jsonb",
|
||||
"snafu 0.8.5",
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.16.0"
|
||||
@@ -4201,9 +4217,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.13.0"
|
||||
version = "1.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
|
||||
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
@@ -6699,7 +6715,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7172,6 +7188,7 @@ dependencies = [
|
||||
"deadpool",
|
||||
"deadpool-postgres",
|
||||
"derive_builder 0.20.1",
|
||||
"either",
|
||||
"etcd-client",
|
||||
"futures",
|
||||
"h2 0.3.26",
|
||||
@@ -7243,6 +7260,7 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
@@ -8457,6 +8475,7 @@ dependencies = [
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-sql",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -9594,7 +9613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
|
||||
dependencies = [
|
||||
"heck 0.5.0",
|
||||
"itertools 0.11.0",
|
||||
"itertools 0.14.0",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@@ -9640,7 +9659,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.11.0",
|
||||
"itertools 0.14.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -11243,6 +11262,7 @@ dependencies = [
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-session",
|
||||
"common-sql",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
@@ -11681,6 +11701,7 @@ dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-query",
|
||||
"common-sql",
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
@@ -14266,7 +14287,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"src/common/recordbatch",
|
||||
"src/common/runtime",
|
||||
"src/common/session",
|
||||
"src/common/sql",
|
||||
"src/common/stat",
|
||||
"src/common/substrait",
|
||||
"src/common/telemetry",
|
||||
@@ -133,6 +134,7 @@ deadpool = "0.12"
|
||||
deadpool-postgres = "0.14"
|
||||
derive_builder = "0.20"
|
||||
dotenv = "0.15"
|
||||
either = "1.15"
|
||||
etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
@@ -261,6 +263,7 @@ common-query = { path = "src/common/query" }
|
||||
common-recordbatch = { path = "src/common/recordbatch" }
|
||||
common-runtime = { path = "src/common/runtime" }
|
||||
common-session = { path = "src/common/session" }
|
||||
common-sql = { path = "src/common/sql" }
|
||||
common-telemetry = { path = "src/common/telemetry" }
|
||||
common-test-util = { path = "src/common/test-util" }
|
||||
common-time = { path = "src/common/time" }
|
||||
|
||||
@@ -43,7 +43,6 @@ common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
datatypes.workspace = true
|
||||
either = "1.8"
|
||||
etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
humantime.workspace = true
|
||||
|
||||
@@ -52,7 +52,6 @@ common-version.workspace = true
|
||||
common-wal.workspace = true
|
||||
datanode.workspace = true
|
||||
datatypes.workspace = true
|
||||
either = "1.8"
|
||||
etcd-client.workspace = true
|
||||
file-engine.workspace = true
|
||||
flow.workspace = true
|
||||
|
||||
@@ -102,7 +102,7 @@ impl App for Instance {
|
||||
#[derive(Parser)]
|
||||
pub struct Command {
|
||||
#[clap(subcommand)]
|
||||
subcmd: SubCommand,
|
||||
pub subcmd: SubCommand,
|
||||
}
|
||||
|
||||
impl Command {
|
||||
@@ -116,7 +116,7 @@ impl Command {
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
enum SubCommand {
|
||||
pub enum SubCommand {
|
||||
Start(StartCommand),
|
||||
}
|
||||
|
||||
@@ -153,7 +153,7 @@ pub struct StartCommand {
|
||||
#[clap(long)]
|
||||
postgres_addr: Option<String>,
|
||||
#[clap(short, long)]
|
||||
config_file: Option<String>,
|
||||
pub config_file: Option<String>,
|
||||
#[clap(short, long)]
|
||||
influxdb_enable: Option<bool>,
|
||||
#[clap(long, value_delimiter = ',', num_args = 1..)]
|
||||
@@ -169,7 +169,7 @@ pub struct StartCommand {
|
||||
#[clap(long)]
|
||||
disable_dashboard: Option<bool>,
|
||||
#[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
|
||||
env_prefix: String,
|
||||
pub env_prefix: String,
|
||||
}
|
||||
|
||||
impl StartCommand {
|
||||
|
||||
@@ -54,6 +54,10 @@ impl Instance {
|
||||
pub fn get_inner(&self) -> &MetasrvInstance {
|
||||
&self.instance
|
||||
}
|
||||
|
||||
pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
|
||||
&mut self.instance
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
22
src/common/sql/Cargo.toml
Normal file
22
src/common/sql/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "common-sql"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
common-base.workspace = true
|
||||
common-datasource.workspace = true
|
||||
common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion-sql.workspace = true
|
||||
datatypes.workspace = true
|
||||
hex = "0.4"
|
||||
jsonb.workspace = true
|
||||
snafu.workspace = true
|
||||
sqlparser.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
1084
src/common/sql/src/convert.rs
Normal file
1084
src/common/sql/src/convert.rs
Normal file
File diff suppressed because it is too large
Load Diff
182
src/common/sql/src/default_constraint.rs
Normal file
182
src/common/sql/src/default_constraint.rs
Normal file
@@ -0,0 +1,182 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::timezone::Timezone;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
|
||||
use datatypes::schema::ColumnDefaultConstraint;
|
||||
pub use sqlparser::ast::{
|
||||
visit_expressions_mut, visit_statements_mut, BinaryOperator, ColumnDef, ColumnOption,
|
||||
ColumnOptionDef, DataType, Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments,
|
||||
Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo, UnaryOperator, Value as SqlValue,
|
||||
Visit, VisitMut, Visitor, VisitorMut,
|
||||
};
|
||||
|
||||
use crate::convert::{sql_number_to_value, sql_value_to_value};
|
||||
use crate::error::{Result, UnsupportedDefaultValueSnafu};
|
||||
|
||||
pub fn parse_column_default_constraint(
|
||||
column_name: &str,
|
||||
data_type: &ConcreteDataType,
|
||||
opts: &[ColumnOptionDef],
|
||||
timezone: Option<&Timezone>,
|
||||
) -> Result<Option<ColumnDefaultConstraint>> {
|
||||
if let Some(opt) = opts
|
||||
.iter()
|
||||
.find(|o| matches!(o.option, ColumnOption::Default(_)))
|
||||
{
|
||||
let default_constraint = match &opt.option {
|
||||
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
|
||||
sql_value_to_value(column_name, data_type, v, timezone, None, false)?,
|
||||
),
|
||||
ColumnOption::Default(Expr::Function(func)) => {
|
||||
let mut func = format!("{func}").to_lowercase();
|
||||
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
|
||||
if func == CURRENT_TIMESTAMP {
|
||||
func = CURRENT_TIMESTAMP_FN.to_string();
|
||||
}
|
||||
// Always use lowercase for function expression
|
||||
ColumnDefaultConstraint::Function(func.to_lowercase())
|
||||
}
|
||||
|
||||
ColumnOption::Default(Expr::UnaryOp { op, expr }) => {
|
||||
// Specialized process for handling numerical inputs to prevent
|
||||
// overflow errors during the parsing of negative numbers,
|
||||
// See https://github.com/GreptimeTeam/greptimedb/issues/4351
|
||||
if let (UnaryOperator::Minus, Expr::Value(SqlValue::Number(n, _))) =
|
||||
(op, expr.as_ref())
|
||||
{
|
||||
return Ok(Some(ColumnDefaultConstraint::Value(sql_number_to_value(
|
||||
data_type,
|
||||
&format!("-{n}"),
|
||||
)?)));
|
||||
}
|
||||
|
||||
if let Expr::Value(v) = &**expr {
|
||||
let value =
|
||||
sql_value_to_value(column_name, data_type, v, timezone, Some(*op), false)?;
|
||||
ColumnDefaultConstraint::Value(value)
|
||||
} else {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: *expr.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
ColumnOption::Default(others) => {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: others.clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
_ => {
|
||||
return UnsupportedDefaultValueSnafu {
|
||||
column_name,
|
||||
expr: Expr::Value(SqlValue::Null),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Some(default_constraint))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use datatypes::types::BooleanType;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_parse_column_default_constraint() {
|
||||
let bool_value = sqlparser::ast::Value::Boolean(true);
|
||||
|
||||
let opts = vec![
|
||||
ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::Value(bool_value)),
|
||||
},
|
||||
ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::NotNull,
|
||||
},
|
||||
];
|
||||
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::Boolean(BooleanType),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
constraint,
|
||||
Some(ColumnDefaultConstraint::Value(Value::Boolean(true)))
|
||||
);
|
||||
|
||||
// Test negative number
|
||||
let opts = vec![ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::UnaryOp {
|
||||
op: UnaryOperator::Minus,
|
||||
expr: Box::new(Expr::Value(SqlValue::Number("32768".to_string(), false))),
|
||||
}),
|
||||
}];
|
||||
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::int16_datatype(),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_matches!(
|
||||
constraint,
|
||||
Some(ColumnDefaultConstraint::Value(Value::Int16(-32768)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_incorrect_default_value_issue_3479() {
|
||||
let opts = vec![ColumnOptionDef {
|
||||
name: None,
|
||||
option: ColumnOption::Default(Expr::Value(SqlValue::Number(
|
||||
"0.047318541668048164".into(),
|
||||
false,
|
||||
))),
|
||||
}];
|
||||
let constraint = parse_column_default_constraint(
|
||||
"coll",
|
||||
&ConcreteDataType::float64_datatype(),
|
||||
&opts,
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!("0.047318541668048164", constraint.to_string());
|
||||
let encoded: Vec<u8> = constraint.clone().try_into().unwrap();
|
||||
let decoded = ColumnDefaultConstraint::try_from(encoded.as_ref()).unwrap();
|
||||
assert_eq!(decoded, constraint);
|
||||
}
|
||||
}
|
||||
158
src/common/sql/src/error.rs
Normal file
158
src/common/sql/src/error.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_sql::sqlparser::ast::UnaryOperator;
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use snafu::{Location, Snafu};
|
||||
pub use sqlparser::ast::{Expr, Value as SqlValue};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// SQL parser errors.
|
||||
// Now the error in parser does not contain backtrace to avoid generating backtrace
|
||||
// every time the parser parses an invalid SQL.
|
||||
#[derive(Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
#[stack_trace_debug]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Column {} expect type: {:?}, actual: {:?}",
|
||||
column_name,
|
||||
expect,
|
||||
actual,
|
||||
))]
|
||||
ColumnTypeMismatch {
|
||||
column_name: String,
|
||||
expect: ConcreteDataType,
|
||||
actual: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse value: {}", msg))]
|
||||
ParseSqlValue {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unsupported expr in default constraint: {:?} for column: {}",
|
||||
expr,
|
||||
column_name
|
||||
))]
|
||||
UnsupportedDefaultValue {
|
||||
column_name: String,
|
||||
expr: Expr,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
|
||||
ConvertSqlValue {
|
||||
value: SqlValue,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid sql value: {}", value))]
|
||||
InvalidSqlValue {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported unary operator {}", unary_op))]
|
||||
UnsupportedUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
|
||||
InvalidUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
value: Value,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
|
||||
InvalidCast {
|
||||
sql_value: sqlparser::ast::Value,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
|
||||
ConvertStr {
|
||||
value: String,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Converting timestamp {:?} to unit {:?} overflow",
|
||||
timestamp,
|
||||
target_unit
|
||||
))]
|
||||
TimestampOverflow {
|
||||
timestamp: Timestamp,
|
||||
target_unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datatype error: {}", source))]
|
||||
Datatype {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
UnsupportedDefaultValue { .. } => StatusCode::Unsupported,
|
||||
ParseSqlValue { .. } => StatusCode::InvalidSyntax,
|
||||
ColumnTypeMismatch { .. }
|
||||
| InvalidSqlValue { .. }
|
||||
| UnsupportedUnaryOp { .. }
|
||||
| InvalidUnaryOp { .. }
|
||||
| InvalidCast { .. }
|
||||
| ConvertStr { .. }
|
||||
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
|
||||
Datatype { source, .. } => source.status_code(),
|
||||
ConvertSqlValue { .. } => StatusCode::Unsupported,
|
||||
}
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
19
src/common/sql/src/lib.rs
Normal file
19
src/common/sql/src/lib.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#![feature(assert_matches)]
|
||||
|
||||
pub mod convert;
|
||||
pub mod default_constraint;
|
||||
pub mod error;
|
||||
@@ -86,26 +86,33 @@ impl RegionAliveKeeper {
|
||||
/// Add the countdown task for a specific region.
|
||||
/// It will be ignored if the task exists.
|
||||
pub async fn register_region(&self, region_id: RegionId) {
|
||||
if self.find_handle(region_id).await.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let handle = Arc::new(CountdownTaskHandle::new(
|
||||
self.region_server.clone(),
|
||||
self.countdown_task_handler_ext.clone(),
|
||||
region_id,
|
||||
));
|
||||
|
||||
let mut handles = self.tasks.lock().await;
|
||||
let _ = handles.insert(region_id, handle.clone());
|
||||
let should_start = {
|
||||
let mut handles = self.tasks.lock().await;
|
||||
|
||||
if self.started.load(Ordering::Relaxed) {
|
||||
// Check if already exists, return early if so
|
||||
if handles.contains_key(®ion_id) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Insert new handle
|
||||
handles.insert(region_id, handle.clone());
|
||||
|
||||
// Return whether we should start (check state inside lock)
|
||||
self.started.load(Ordering::Relaxed)
|
||||
};
|
||||
|
||||
if should_start {
|
||||
handle.start(self.heartbeat_interval_millis).await;
|
||||
|
||||
info!("Region alive countdown for region {region_id} is started!",);
|
||||
info!("Region alive countdown for region {region_id} is started!");
|
||||
} else {
|
||||
info!(
|
||||
"Region alive countdown for region {region_id} is registered but not started yet!",
|
||||
"Region alive countdown for region {region_id} is registered but not started yet!"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ datatypes.workspace = true
|
||||
deadpool = { workspace = true, optional = true }
|
||||
deadpool-postgres = { workspace = true, optional = true }
|
||||
derive_builder.workspace = true
|
||||
either.workspace = true
|
||||
etcd-client.workspace = true
|
||||
futures.workspace = true
|
||||
h2 = "0.3"
|
||||
|
||||
@@ -34,6 +34,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use common_telemetry::info;
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
use deadpool_postgres::{Config, Runtime};
|
||||
use either::Either;
|
||||
use etcd_client::Client;
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
@@ -76,7 +77,7 @@ use crate::{error, Result};
|
||||
pub struct MetasrvInstance {
|
||||
metasrv: Arc<Metasrv>,
|
||||
|
||||
http_server: HttpServer,
|
||||
http_server: Either<Option<HttpServerBuilder>, HttpServer>,
|
||||
|
||||
opts: MetasrvOptions,
|
||||
|
||||
@@ -99,10 +100,9 @@ impl MetasrvInstance {
|
||||
plugins: Plugins,
|
||||
metasrv: Metasrv,
|
||||
) -> Result<MetasrvInstance> {
|
||||
let http_server = HttpServerBuilder::new(opts.http.clone())
|
||||
let builder = HttpServerBuilder::new(opts.http.clone())
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
|
||||
.build();
|
||||
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
|
||||
|
||||
let metasrv = Arc::new(metasrv);
|
||||
// put metasrv into plugins for later use
|
||||
@@ -111,7 +111,7 @@ impl MetasrvInstance {
|
||||
.context(error::InitExportMetricsTaskSnafu)?;
|
||||
Ok(MetasrvInstance {
|
||||
metasrv,
|
||||
http_server,
|
||||
http_server: Either::Left(Some(builder)),
|
||||
opts,
|
||||
signal_sender: None,
|
||||
plugins,
|
||||
@@ -122,6 +122,25 @@ impl MetasrvInstance {
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<()> {
|
||||
if let Some(builder) = self.http_server.as_mut().left()
|
||||
&& let Some(builder) = builder.take()
|
||||
{
|
||||
let mut server = builder.build();
|
||||
|
||||
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
|
||||
addr: &self.opts.http.addr,
|
||||
})?;
|
||||
info!("starting http server at {}", addr);
|
||||
server.start(addr).await.context(error::StartHttpSnafu)?;
|
||||
|
||||
self.http_server = Either::Right(server);
|
||||
} else {
|
||||
// If the http server builder is not present, the Metasrv has to be called "start"
|
||||
// already, regardless of the startup was successful or not. Return an `Ok` here for
|
||||
// simplicity.
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
self.metasrv.try_start().await?;
|
||||
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
@@ -144,14 +163,6 @@ impl MetasrvInstance {
|
||||
.await?;
|
||||
self.bind_addr = Some(socket_addr);
|
||||
|
||||
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
|
||||
addr: &self.opts.http.addr,
|
||||
})?;
|
||||
self.http_server
|
||||
.start(addr)
|
||||
.await
|
||||
.context(error::StartHttpSnafu)?;
|
||||
|
||||
*self.serve_state.lock().await = Some(serve_state_rx);
|
||||
Ok(())
|
||||
}
|
||||
@@ -169,12 +180,15 @@ impl MetasrvInstance {
|
||||
.context(error::SendShutdownSignalSnafu)?;
|
||||
}
|
||||
self.metasrv.shutdown().await?;
|
||||
self.http_server
|
||||
.shutdown()
|
||||
.await
|
||||
.context(error::ShutdownServerSnafu {
|
||||
server: self.http_server.name(),
|
||||
})?;
|
||||
|
||||
if let Some(http_server) = self.http_server.as_ref().right() {
|
||||
http_server
|
||||
.shutdown()
|
||||
.await
|
||||
.context(error::ShutdownServerSnafu {
|
||||
server: http_server.name(),
|
||||
})?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -188,6 +202,14 @@ impl MetasrvInstance {
|
||||
pub fn bind_addr(&self) -> &Option<SocketAddr> {
|
||||
&self.bind_addr
|
||||
}
|
||||
|
||||
pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
|
||||
&mut self.http_server
|
||||
}
|
||||
|
||||
pub fn http_server(&self) -> Option<&HttpServer> {
|
||||
self.http_server.as_ref().right()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bootstrap_metasrv_with_router(
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#![feature(result_flattening)]
|
||||
#![feature(assert_matches)]
|
||||
#![feature(hash_set_entry)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod bootstrap;
|
||||
pub mod cache_invalidator;
|
||||
|
||||
@@ -40,5 +40,6 @@ store-api.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-test-util.workspace = true
|
||||
mito2 = { workspace = true, features = ["test"] }
|
||||
|
||||
@@ -477,8 +477,9 @@ struct MetricEngineInner {
|
||||
mod test {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_telemetry::info;
|
||||
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
|
||||
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
|
||||
use store_api::region_request::{RegionCloseRequest, RegionFlushRequest, RegionOpenRequest};
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::TestEnv;
|
||||
@@ -563,4 +564,90 @@ mod test {
|
||||
assert!(env.metric().region_statistic(logical_region_id).is_none());
|
||||
assert!(env.metric().region_statistic(physical_region_id).is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_region_failure() {
|
||||
let env = TestEnv::new().await;
|
||||
env.init_metric_region().await;
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
|
||||
let metric_engine = env.metric();
|
||||
metric_engine
|
||||
.handle_request(
|
||||
physical_region_id,
|
||||
RegionRequest::Flush(RegionFlushRequest {
|
||||
row_group_size: None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let path = format!("{}/metadata/", env.default_region_dir());
|
||||
let object_store = env.get_object_store().unwrap();
|
||||
let list = object_store.list(&path).await.unwrap();
|
||||
// Delete parquet files in metadata region
|
||||
for entry in list {
|
||||
if entry.metadata().is_dir() {
|
||||
continue;
|
||||
}
|
||||
if entry.name().ends_with("parquet") {
|
||||
info!("deleting {}", entry.path());
|
||||
object_store.delete(entry.path()).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let open_request = RegionOpenRequest {
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
region_dir: env.default_region_dir(),
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: false,
|
||||
};
|
||||
// Opening an already opened region should succeed.
|
||||
// Since the region is already open, no metadata recovery operations will be performed.
|
||||
metric_engine
|
||||
.handle_request(physical_region_id, RegionRequest::Open(open_request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Close the region
|
||||
metric_engine
|
||||
.handle_request(
|
||||
physical_region_id,
|
||||
RegionRequest::Close(RegionCloseRequest {}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Try to reopen region.
|
||||
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let open_request = RegionOpenRequest {
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
region_dir: env.default_region_dir(),
|
||||
options: physical_region_option,
|
||||
skip_wal_replay: false,
|
||||
};
|
||||
let err = metric_engine
|
||||
.handle_request(physical_region_id, RegionRequest::Open(open_request))
|
||||
.await
|
||||
.unwrap_err();
|
||||
// Failed to open region because of missing parquet files.
|
||||
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
|
||||
|
||||
let mito_engine = metric_engine.mito();
|
||||
let data_region_id = utils::to_data_region_id(physical_region_id);
|
||||
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
// The metadata/data region should be closed.
|
||||
let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
|
||||
let err = mito_engine
|
||||
.get_metadata(metadata_region_id)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ impl MetricEngineInner {
|
||||
let mut manifest_infos = Vec::with_capacity(1);
|
||||
self.alter_logical_regions(physical_region_id, requests, extension_return_value)
|
||||
.await?;
|
||||
append_manifest_info(&self.mito, region_id, &mut manifest_infos);
|
||||
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
|
||||
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
|
||||
} else {
|
||||
let grouped_requests =
|
||||
@@ -222,13 +222,17 @@ mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::region_request::{AddColumn, SetRegionOption};
|
||||
use common_meta::ddl::test_util::assert_column_name_and_id;
|
||||
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
|
||||
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{
|
||||
AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
|
||||
};
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::TestEnv;
|
||||
use crate::test_util::{alter_logical_region_request, create_logical_region_request, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region() {
|
||||
@@ -239,22 +243,7 @@ mod test {
|
||||
|
||||
// alter physical region
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
let request = RegionAlterRequest {
|
||||
kind: AlterKind::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_id: 0,
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag1",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
},
|
||||
location: None,
|
||||
}],
|
||||
},
|
||||
};
|
||||
let request = alter_logical_region_request(&["tag1"]);
|
||||
|
||||
let result = engine_inner
|
||||
.alter_physical_region(physical_region_id, request.clone())
|
||||
@@ -287,14 +276,18 @@ mod test {
|
||||
assert!(!is_column_exist);
|
||||
|
||||
let region_id = env.default_logical_region_id();
|
||||
engine_inner
|
||||
.alter_logical_regions(
|
||||
physical_region_id,
|
||||
vec![(region_id, request)],
|
||||
&mut HashMap::new(),
|
||||
)
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
|
||||
region_id,
|
||||
request.clone(),
|
||||
)]))
|
||||
.await
|
||||
.unwrap();
|
||||
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
|
||||
assert_eq!(manifest_infos[0].0, physical_region_id);
|
||||
assert!(manifest_infos[0].1.is_metric());
|
||||
|
||||
let semantic_type = metadata_region
|
||||
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
|
||||
.await
|
||||
@@ -307,5 +300,77 @@ mod test {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(timestamp_index, SemanticType::Timestamp);
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
|
||||
assert_column_name_and_id(
|
||||
&column_metadatas,
|
||||
&[
|
||||
("greptime_timestamp", 0),
|
||||
("greptime_value", 1),
|
||||
("__table_id", ReservedColumnId::table_id()),
|
||||
("__tsid", ReservedColumnId::tsid()),
|
||||
("job", 2),
|
||||
("tag1", 3),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_logical_regions() {
|
||||
let env = TestEnv::new().await;
|
||||
let engine = env.metric();
|
||||
let physical_region_id1 = RegionId::new(1024, 0);
|
||||
let physical_region_id2 = RegionId::new(1024, 1);
|
||||
let logical_region_id1 = RegionId::new(1025, 0);
|
||||
let logical_region_id2 = RegionId::new(1025, 1);
|
||||
env.create_physical_region(physical_region_id1, "/test_dir1")
|
||||
.await;
|
||||
env.create_physical_region(physical_region_id2, "/test_dir2")
|
||||
.await;
|
||||
|
||||
let region_create_request1 = crate::test_util::create_logical_region_request(
|
||||
&["job"],
|
||||
physical_region_id1,
|
||||
"logical1",
|
||||
);
|
||||
let region_create_request2 =
|
||||
create_logical_region_request(&["job"], physical_region_id2, "logical2");
|
||||
engine
|
||||
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
|
||||
(logical_region_id1, region_create_request1),
|
||||
(logical_region_id2, region_create_request2),
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let region_alter_request1 = alter_logical_region_request(&["tag1"]);
|
||||
let region_alter_request2 = alter_logical_region_request(&["tag1"]);
|
||||
let response = engine
|
||||
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
|
||||
(logical_region_id1, region_alter_request1),
|
||||
(logical_region_id2, region_alter_request2),
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
|
||||
assert_eq!(manifest_infos.len(), 2);
|
||||
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
|
||||
assert!(region_ids.contains(&physical_region_id1));
|
||||
assert!(region_ids.contains(&physical_region_id2));
|
||||
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
|
||||
assert_column_name_and_id(
|
||||
&column_metadatas,
|
||||
&[
|
||||
("greptime_timestamp", 0),
|
||||
("greptime_value", 1),
|
||||
("__table_id", ReservedColumnId::table_id()),
|
||||
("__tsid", ReservedColumnId::tsid()),
|
||||
("job", 2),
|
||||
("tag1", 3),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ impl MetricEngineInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
|
||||
pub(crate) async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
|
||||
let data_region_id = utils::to_data_region_id(region_id);
|
||||
let metadata_region_id = utils::to_metadata_region_id(region_id);
|
||||
|
||||
|
||||
@@ -80,7 +80,8 @@ impl MetricEngineInner {
|
||||
}
|
||||
);
|
||||
let (region_id, request) = requests.pop().unwrap();
|
||||
self.create_physical_region(region_id, request).await?;
|
||||
self.create_physical_region(region_id, request, extension_return_value)
|
||||
.await?;
|
||||
|
||||
return Ok(0);
|
||||
} else if first_request
|
||||
@@ -122,6 +123,7 @@ impl MetricEngineInner {
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionCreateRequest,
|
||||
extension_return_value: &mut HashMap<String, Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
|
||||
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
|
||||
@@ -162,7 +164,8 @@ impl MetricEngineInner {
|
||||
.context(UnexpectedRequestSnafu {
|
||||
reason: "No time index column found",
|
||||
})?;
|
||||
self.mito
|
||||
let response = self
|
||||
.mito
|
||||
.handle_request(
|
||||
data_region_id,
|
||||
RegionRequest::Create(create_data_region_request),
|
||||
@@ -176,6 +179,7 @@ impl MetricEngineInner {
|
||||
region_id: data_region_id,
|
||||
},
|
||||
)?;
|
||||
extension_return_value.extend(response.extensions);
|
||||
|
||||
info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
|
||||
PHYSICAL_REGION_COUNT.inc();
|
||||
@@ -613,12 +617,15 @@ pub(crate) fn region_options_for_metadata_region(
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use common_meta::ddl::test_util::assert_column_name_and_id;
|
||||
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
|
||||
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
|
||||
use store_api::region_request::BatchRegionDdlRequest;
|
||||
|
||||
use super::*;
|
||||
use crate::config::EngineConfig;
|
||||
use crate::engine::MetricEngine;
|
||||
use crate::test_util::TestEnv;
|
||||
use crate::test_util::{create_logical_region_request, TestEnv};
|
||||
|
||||
#[test]
|
||||
fn test_verify_region_create_request() {
|
||||
@@ -807,4 +814,50 @@ mod test {
|
||||
);
|
||||
assert!(!metadata_region_request.options.contains_key("skip_wal"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_logical_regions() {
|
||||
let env = TestEnv::new().await;
|
||||
let engine = env.metric();
|
||||
let physical_region_id1 = RegionId::new(1024, 0);
|
||||
let physical_region_id2 = RegionId::new(1024, 1);
|
||||
let logical_region_id1 = RegionId::new(1025, 0);
|
||||
let logical_region_id2 = RegionId::new(1025, 1);
|
||||
env.create_physical_region(physical_region_id1, "/test_dir1")
|
||||
.await;
|
||||
env.create_physical_region(physical_region_id2, "/test_dir2")
|
||||
.await;
|
||||
|
||||
let region_create_request1 =
|
||||
create_logical_region_request(&["job"], physical_region_id1, "logical1");
|
||||
let region_create_request2 =
|
||||
create_logical_region_request(&["job"], physical_region_id2, "logical2");
|
||||
|
||||
let response = engine
|
||||
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
|
||||
(logical_region_id1, region_create_request1),
|
||||
(logical_region_id2, region_create_request2),
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
|
||||
assert_eq!(manifest_infos.len(), 2);
|
||||
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
|
||||
assert!(region_ids.contains(&physical_region_id1));
|
||||
assert!(region_ids.contains(&physical_region_id2));
|
||||
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
|
||||
assert_column_name_and_id(
|
||||
&column_metadatas,
|
||||
&[
|
||||
("greptime_timestamp", 0),
|
||||
("greptime_value", 1),
|
||||
("__table_id", ReservedColumnId::table_id()),
|
||||
("__tsid", ReservedColumnId::tsid()),
|
||||
("job", 2),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::SemanticType;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use datafusion::common::HashMap;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
use object_store::util::join_dir;
|
||||
@@ -94,6 +94,21 @@ impl MetricEngineInner {
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
// If the metadata region is opened with a stale manifest,
|
||||
// the metric engine may fail to recover logical tables from the metadata region,
|
||||
// as the manifest could reference files that have already been deleted
|
||||
// due to compaction operations performed by the region leader.
|
||||
async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
|
||||
info!(
|
||||
"Closing metadata region {} and data region {} on metadata recovery failure",
|
||||
utils::to_metadata_region_id(physical_region_id),
|
||||
utils::to_data_region_id(physical_region_id)
|
||||
);
|
||||
if let Err(err) = self.close_physical_region(physical_region_id).await {
|
||||
error!(err; "Failed to close physical region {}", physical_region_id);
|
||||
}
|
||||
}
|
||||
|
||||
async fn open_physical_region_with_results(
|
||||
&self,
|
||||
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
|
||||
@@ -119,8 +134,14 @@ impl MetricEngineInner {
|
||||
region_type: "data",
|
||||
})?;
|
||||
|
||||
self.recover_states(physical_region_id, physical_region_options)
|
||||
.await?;
|
||||
if let Err(err) = self
|
||||
.recover_states(physical_region_id, physical_region_options)
|
||||
.await
|
||||
{
|
||||
self.close_physical_region_on_recovery_failure(physical_region_id)
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
Ok(data_region_response)
|
||||
}
|
||||
|
||||
@@ -139,11 +160,31 @@ impl MetricEngineInner {
|
||||
request: RegionOpenRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
if request.is_physical_table() {
|
||||
if self
|
||||
.state
|
||||
.read()
|
||||
.unwrap()
|
||||
.physical_region_states()
|
||||
.get(®ion_id)
|
||||
.is_some()
|
||||
{
|
||||
warn!(
|
||||
"The physical region {} is already open, ignore the open request",
|
||||
region_id
|
||||
);
|
||||
return Ok(0);
|
||||
}
|
||||
// open physical region and recover states
|
||||
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
|
||||
self.open_physical_region(region_id, request).await?;
|
||||
self.recover_states(region_id, physical_region_options)
|
||||
.await?;
|
||||
if let Err(err) = self
|
||||
.recover_states(region_id, physical_region_options)
|
||||
.await
|
||||
{
|
||||
self.close_physical_region_on_recovery_failure(region_id)
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(0)
|
||||
} else {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
|
||||
use common_meta::ddl::utils::parse_column_metadatas;
|
||||
use common_telemetry::debug;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
@@ -23,14 +24,17 @@ use mito2::config::MitoConfig;
|
||||
use mito2::engine::MitoEngine;
|
||||
use mito2::test_util::TestEnv as MitoTestEnv;
|
||||
use object_store::util::join_dir;
|
||||
use object_store::ObjectStore;
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::{
|
||||
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
|
||||
ALTER_PHYSICAL_EXTENSION_KEY, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
||||
PHYSICAL_TABLE_METADATA_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
|
||||
};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::{
|
||||
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionOpenRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::config::EngineConfig;
|
||||
@@ -74,6 +78,10 @@ impl TestEnv {
|
||||
join_dir(&env_root, "data")
|
||||
}
|
||||
|
||||
pub fn get_object_store(&self) -> Option<ObjectStore> {
|
||||
self.mito_env.get_object_store()
|
||||
}
|
||||
|
||||
/// Returns a reference to the engine.
|
||||
pub fn mito(&self) -> MitoEngine {
|
||||
self.mito.clone()
|
||||
@@ -111,13 +119,8 @@ impl TestEnv {
|
||||
(mito, metric)
|
||||
}
|
||||
|
||||
/// Create regions in [MetricEngine] under [`default_region_id`]
|
||||
/// and region dir `"test_metric_region"`.
|
||||
///
|
||||
/// This method will create one logical region with three columns `(ts, val, job)`
|
||||
/// under [`default_logical_region_id`].
|
||||
pub async fn init_metric_region(&self) {
|
||||
let region_id = self.default_physical_region_id();
|
||||
/// Create regions in [MetricEngine] with specific `physical_region_id`.
|
||||
pub async fn create_physical_region(&self, physical_region_id: RegionId, region_dir: &str) {
|
||||
let region_create_request = RegionCreateRequest {
|
||||
engine: METRIC_ENGINE_NAME.to_string(),
|
||||
column_metadatas: vec![
|
||||
@@ -144,26 +147,88 @@ impl TestEnv {
|
||||
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
region_dir: self.default_region_dir(),
|
||||
region_dir: region_dir.to_string(),
|
||||
};
|
||||
|
||||
// create physical region
|
||||
self.metric()
|
||||
.handle_request(region_id, RegionRequest::Create(region_create_request))
|
||||
let response = self
|
||||
.metric()
|
||||
.handle_request(
|
||||
physical_region_id,
|
||||
RegionRequest::Create(region_create_request),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY)
|
||||
.unwrap();
|
||||
assert_eq!(column_metadatas.len(), 4);
|
||||
}
|
||||
|
||||
// create logical region
|
||||
let region_id = self.default_logical_region_id();
|
||||
/// Create logical region in [MetricEngine] with specific `physical_region_id` and `logical_region_id`.
|
||||
pub async fn create_logical_region(
|
||||
&self,
|
||||
physical_region_id: RegionId,
|
||||
logical_region_id: RegionId,
|
||||
) {
|
||||
let region_create_request = create_logical_region_request(
|
||||
&["job"],
|
||||
self.default_physical_region_id(),
|
||||
physical_region_id,
|
||||
"test_metric_logical_region",
|
||||
);
|
||||
self.metric()
|
||||
.handle_request(region_id, RegionRequest::Create(region_create_request))
|
||||
let response = self
|
||||
.metric()
|
||||
.handle_request(
|
||||
logical_region_id,
|
||||
RegionRequest::Create(region_create_request),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
|
||||
assert_eq!(column_metadatas.len(), 5);
|
||||
let column_names = column_metadatas
|
||||
.iter()
|
||||
.map(|c| c.column_schema.name.as_str())
|
||||
.collect::<Vec<_>>();
|
||||
let column_ids = column_metadatas
|
||||
.iter()
|
||||
.map(|c| c.column_id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
column_names,
|
||||
vec![
|
||||
"greptime_timestamp",
|
||||
"greptime_value",
|
||||
"__table_id",
|
||||
"__tsid",
|
||||
"job",
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
column_ids,
|
||||
vec![
|
||||
0,
|
||||
1,
|
||||
ReservedColumnId::table_id(),
|
||||
ReservedColumnId::tsid(),
|
||||
2,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Create regions in [MetricEngine] under [`default_region_id`]
|
||||
/// and region dir `"test_metric_region"`.
|
||||
///
|
||||
/// This method will create one logical region with three columns `(ts, val, job)`
|
||||
/// under [`default_logical_region_id`].
|
||||
pub async fn init_metric_region(&self) {
|
||||
let physical_region_id = self.default_physical_region_id();
|
||||
self.create_physical_region(physical_region_id, &self.default_region_dir())
|
||||
.await;
|
||||
let logical_region_id = self.default_logical_region_id();
|
||||
self.create_logical_region(physical_region_id, logical_region_id)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn metadata_region(&self) -> MetadataRegion {
|
||||
@@ -269,6 +334,30 @@ pub fn create_logical_region_request(
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a [RegionAlterRequest] for logical region.
|
||||
/// Only need to specify tag column's name
|
||||
pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest {
|
||||
RegionAlterRequest {
|
||||
kind: AlterKind::AddColumns {
|
||||
columns: tags
|
||||
.iter()
|
||||
.map(|tag| AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_id: 0,
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_schema: ColumnSchema::new(
|
||||
tag.to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
},
|
||||
location: None,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a row schema with given tag columns.
|
||||
///
|
||||
/// The result will also contains default timestamp and value column at beginning.
|
||||
|
||||
@@ -80,8 +80,10 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::logstore::provider::Provider;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
|
||||
use store_api::metric_engine_consts::{
|
||||
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
|
||||
};
|
||||
use store_api::region_engine::{
|
||||
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
|
||||
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
|
||||
@@ -95,7 +97,7 @@ use crate::cache::CacheStrategy;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
|
||||
SerdeJsonSnafu,
|
||||
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
|
||||
};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::extension::BoxedExtensionRangeProviderFactory;
|
||||
@@ -335,6 +337,22 @@ impl MitoEngine {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode_column_metadatas_to_extensions(
|
||||
region_id: &RegionId,
|
||||
column_metadatas: Vec<ColumnMetadata>,
|
||||
extensions: &mut HashMap<String, Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
extensions.insert(
|
||||
TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
|
||||
ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
|
||||
);
|
||||
info!(
|
||||
"Added column metadatas: {:?} to extensions, region_id: {:?}",
|
||||
column_metadatas, region_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Find the current version's memtables and SSTs stats by region_id.
|
||||
/// The stats must be collected in one place one time to ensure data consistency.
|
||||
pub fn find_memtable_and_sst_stats(
|
||||
@@ -695,6 +713,7 @@ impl RegionEngine for MitoEngine {
|
||||
.start_timer();
|
||||
|
||||
let is_alter = matches!(request, RegionRequest::Alter(_));
|
||||
let is_create = matches!(request, RegionRequest::Create(_));
|
||||
let mut response = self
|
||||
.inner
|
||||
.handle_request(region_id, request)
|
||||
@@ -703,14 +722,11 @@ impl RegionEngine for MitoEngine {
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
if is_alter {
|
||||
if let Some(statistic) = self.region_statistic(region_id) {
|
||||
Self::encode_manifest_info_to_extensions(
|
||||
®ion_id,
|
||||
statistic.manifest,
|
||||
&mut response.extensions,
|
||||
)
|
||||
self.handle_alter_response(region_id, &mut response)
|
||||
.map_err(BoxedError::new)?;
|
||||
} else if is_create {
|
||||
self.handle_create_response(region_id, &mut response)
|
||||
.map_err(BoxedError::new)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
@@ -803,6 +819,55 @@ impl RegionEngine for MitoEngine {
|
||||
}
|
||||
}
|
||||
|
||||
impl MitoEngine {
|
||||
fn handle_alter_response(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
response: &mut RegionResponse,
|
||||
) -> Result<()> {
|
||||
if let Some(statistic) = self.region_statistic(region_id) {
|
||||
Self::encode_manifest_info_to_extensions(
|
||||
®ion_id,
|
||||
statistic.manifest,
|
||||
&mut response.extensions,
|
||||
)?;
|
||||
}
|
||||
let column_metadatas = self
|
||||
.inner
|
||||
.find_region(region_id)
|
||||
.ok()
|
||||
.map(|r| r.metadata().column_metadatas.clone());
|
||||
if let Some(column_metadatas) = column_metadatas {
|
||||
Self::encode_column_metadatas_to_extensions(
|
||||
®ion_id,
|
||||
column_metadatas,
|
||||
&mut response.extensions,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_create_response(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
response: &mut RegionResponse,
|
||||
) -> Result<()> {
|
||||
let column_metadatas = self
|
||||
.inner
|
||||
.find_region(region_id)
|
||||
.ok()
|
||||
.map(|r| r.metadata().column_metadatas.clone());
|
||||
if let Some(column_metadatas) = column_metadatas {
|
||||
Self::encode_column_metadatas_to_extensions(
|
||||
®ion_id,
|
||||
column_metadatas,
|
||||
&mut response.extensions,
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Tests methods.
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
||||
@@ -20,16 +20,18 @@ use std::time::Duration;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions};
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::region_engine::{RegionEngine, RegionRole};
|
||||
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
|
||||
use store_api::region_engine::{RegionEngine, RegionManifestInfo, RegionRole};
|
||||
use store_api::region_request::{
|
||||
AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, RegionAlterRequest,
|
||||
RegionOpenRequest, RegionRequest, SetRegionOption,
|
||||
};
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
use store_api::storage::{ColumnId, RegionId, ScanRequest};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
|
||||
@@ -113,6 +115,17 @@ fn check_region_version(
|
||||
assert_eq!(flushed_sequence, version_data.version.flushed_sequence);
|
||||
}
|
||||
|
||||
fn assert_column_metadatas(column_name: &[(&str, ColumnId)], column_metadatas: &[ColumnMetadata]) {
|
||||
assert_eq!(column_name.len(), column_metadatas.len());
|
||||
for (name, id) in column_name {
|
||||
let column_metadata = column_metadatas
|
||||
.iter()
|
||||
.find(|c| c.column_id == *id)
|
||||
.unwrap();
|
||||
assert_eq!(column_metadata.column_schema.name, *name);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -136,10 +149,16 @@ async fn test_alter_region() {
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
let region_dir = request.region_dir.clone();
|
||||
engine
|
||||
let response = engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
|
||||
assert_column_metadatas(
|
||||
&[("tag_0", 0), ("field_0", 1), ("ts", 2)],
|
||||
&column_metadatas,
|
||||
);
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
@@ -148,7 +167,7 @@ async fn test_alter_region() {
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let request = add_tag1();
|
||||
engine
|
||||
let response = engine
|
||||
.handle_request(region_id, RegionRequest::Alter(request))
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -164,6 +183,18 @@ async fn test_alter_region() {
|
||||
scan_check_after_alter(&engine, region_id, expected).await;
|
||||
check_region_version(&engine, region_id, 1, 3, 1, 3);
|
||||
|
||||
let mut manifests = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
|
||||
assert_eq!(manifests.len(), 1);
|
||||
let (return_region_id, manifest) = manifests.remove(0);
|
||||
assert_eq!(return_region_id, region_id);
|
||||
assert_eq!(manifest, RegionManifestInfo::mito(2, 1));
|
||||
let column_metadatas =
|
||||
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
|
||||
assert_column_metadatas(
|
||||
&[("tag_0", 0), ("field_0", 1), ("ts", 2), ("tag_1", 3)],
|
||||
&column_metadatas,
|
||||
);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
engine
|
||||
|
||||
@@ -96,6 +96,14 @@ pub enum Error {
|
||||
error: serde_json::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to serialize column metadata"))]
|
||||
SerializeColumnMetadata {
|
||||
#[snafu(source)]
|
||||
error: serde_json::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid scan index, start: {}, end: {}", start, end))]
|
||||
InvalidScanIndex {
|
||||
start: ManifestVersion,
|
||||
@@ -1063,7 +1071,8 @@ impl ErrorExt for Error {
|
||||
| NoCheckpoint { .. }
|
||||
| NoManifests { .. }
|
||||
| InstallManifestTo { .. }
|
||||
| Unexpected { .. } => StatusCode::Unexpected,
|
||||
| Unexpected { .. }
|
||||
| SerializeColumnMetadata { .. } => StatusCode::Unexpected,
|
||||
|
||||
RegionNotFound { .. } => StatusCode::RegionNotFound,
|
||||
ObjectStoreNotFound { .. }
|
||||
|
||||
@@ -6,7 +6,7 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = []
|
||||
enterprise = ["common-meta/enterprise", "sql/enterprise"]
|
||||
enterprise = ["common-meta/enterprise", "sql/enterprise", "query/enterprise"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
@@ -35,6 +35,7 @@ common-meta.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion.workspace = true
|
||||
|
||||
@@ -829,13 +829,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[snafu(display("Trigger related operations are not currently supported"))]
|
||||
UnsupportedTrigger {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid time index type: {}", ty))]
|
||||
InvalidTimeIndexType {
|
||||
ty: arrow::datatypes::DataType,
|
||||
@@ -851,6 +844,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Sql common error"))]
|
||||
SqlCommon {
|
||||
source: common_sql::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -902,8 +902,6 @@ impl ErrorExt for Error {
|
||||
Error::NotSupported { .. }
|
||||
| Error::ShowCreateTableBaseOnly { .. }
|
||||
| Error::SchemaReadOnly { .. } => StatusCode::Unsupported,
|
||||
#[cfg(feature = "enterprise")]
|
||||
Error::UnsupportedTrigger { .. } => StatusCode::Unsupported,
|
||||
Error::TableMetadataManager { source, .. } => source.status_code(),
|
||||
Error::ParseSql { source, .. } => source.status_code(),
|
||||
Error::InvalidateTableCache { source, .. } => source.status_code(),
|
||||
@@ -981,6 +979,7 @@ impl ErrorExt for Error {
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
||||
Error::SqlCommon { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use session::context::{QueryContext, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements;
|
||||
use sql::statements::insert::Insert;
|
||||
use sqlparser::ast::{ObjectName, Value as SqlValue};
|
||||
use table::metadata::TableInfoRef;
|
||||
@@ -227,7 +226,7 @@ fn sql_value_to_grpc_value(
|
||||
column: column.clone(),
|
||||
})?
|
||||
} else {
|
||||
statements::sql_value_to_value(
|
||||
common_sql::convert::sql_value_to_value(
|
||||
column,
|
||||
&column_schema.data_type,
|
||||
sql_val,
|
||||
@@ -235,7 +234,7 @@ fn sql_value_to_grpc_value(
|
||||
None,
|
||||
auto_string_to_numeric,
|
||||
)
|
||||
.context(ParseSqlSnafu)?
|
||||
.context(crate::error::SqlCommonSnafu)?
|
||||
};
|
||||
|
||||
let grpc_value = value_to_grpc_value(value);
|
||||
|
||||
@@ -19,6 +19,7 @@ use common_function::function_registry::FUNCTION_REGISTRY;
|
||||
use common_query::prelude::TypeSignature;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, RecordBatches};
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_telemetry::tracing;
|
||||
use common_time::Timezone;
|
||||
use datatypes::data_type::DataType;
|
||||
@@ -30,7 +31,6 @@ use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
|
||||
use sql::statements::admin::Admin;
|
||||
use sql::statements::sql_value_to_value;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
@@ -186,7 +186,7 @@ fn values_to_vectors_by_exact_types(
|
||||
.zip(exact_types.iter())
|
||||
.map(|(value, data_type)| {
|
||||
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
|
||||
.context(error::ParseSqlValueSnafu)?;
|
||||
.context(error::SqlCommonSnafu)?;
|
||||
|
||||
Ok(value_to_vector(value))
|
||||
})
|
||||
|
||||
@@ -45,6 +45,7 @@ use common_meta::rpc::ddl::{
|
||||
};
|
||||
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
|
||||
use common_query::Output;
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_telemetry::{debug, info, tracing, warn};
|
||||
use common_time::Timezone;
|
||||
use datafusion_common::tree_node::TreeNodeVisitor;
|
||||
@@ -71,7 +72,6 @@ use sql::statements::create::trigger::CreateTrigger;
|
||||
use sql::statements::create::{
|
||||
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
|
||||
};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
@@ -87,10 +87,10 @@ use crate::error::{
|
||||
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
|
||||
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
|
||||
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
|
||||
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
|
||||
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
|
||||
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
|
||||
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
|
||||
SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
|
||||
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
||||
ViewAlreadyExistsSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
use crate::statement::show::create_partitions_stmt;
|
||||
@@ -1859,7 +1859,7 @@ fn convert_value(
|
||||
unary_op,
|
||||
false,
|
||||
)
|
||||
.context(ParseSqlValueSnafu)
|
||||
.context(error::SqlCommonSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -231,10 +231,12 @@ impl StatementExecutor {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn show_triggers(
|
||||
&self,
|
||||
_stmt: sql::statements::show::trigger::ShowTriggers,
|
||||
_query_ctx: QueryContextRef,
|
||||
stmt: sql::statements::show::trigger::ShowTriggers,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
crate::error::UnsupportedTriggerSnafu.fail()
|
||||
query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
|
||||
.await
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -7,6 +7,9 @@ license.workspace = true
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
enterprise = []
|
||||
|
||||
[dependencies]
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
|
||||
@@ -155,7 +155,23 @@ struct PlanRewriter {
|
||||
/// Partition columns of the table in current pass
|
||||
partition_cols: Option<Vec<String>>,
|
||||
column_requirements: HashSet<Column>,
|
||||
/// Whether to expand on next call
|
||||
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
|
||||
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
|
||||
/// to be expanded from the parent node of the Aggregate plan.
|
||||
expand_on_next_call: bool,
|
||||
/// Expanding on next partial/conditional/transformed commutative plan
|
||||
/// This is used to handle the case where a plan is transformed, but still
|
||||
/// need to push down as many node as possible before next partial/conditional/transformed commutative
|
||||
/// plan. I.e.
|
||||
/// ```
|
||||
/// Limit:
|
||||
/// Sort:
|
||||
/// ```
|
||||
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
|
||||
/// In this case, we need to expand the `Limit` plan,
|
||||
/// so that we can push down the `Sort` plan as much as possible.
|
||||
expand_on_next_part_cond_trans_commutative: bool,
|
||||
new_child_plan: Option<LogicalPlan>,
|
||||
}
|
||||
|
||||
@@ -177,15 +193,38 @@ impl PlanRewriter {
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_call {
|
||||
self.expand_on_next_call = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
if self.expand_on_next_part_cond_trans_commutative {
|
||||
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
|
||||
match comm {
|
||||
Commutativity::PartialCommutative => {
|
||||
// a small difference is that for partial commutative, we still need to
|
||||
// expand on next call(so `Limit` can be pushed down)
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
self.expand_on_next_call = true;
|
||||
}
|
||||
Commutativity::ConditionalCommutative(_)
|
||||
| Commutativity::TransformedCommutative { .. } => {
|
||||
// for conditional commutative and transformed commutative, we can
|
||||
// expand now
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
return true;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
|
||||
Commutativity::Commutative => {}
|
||||
Commutativity::PartialCommutative => {
|
||||
if let Some(plan) = partial_commutative_transformer(plan) {
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -194,6 +233,7 @@ impl PlanRewriter {
|
||||
&& let Some(plan) = transformer(plan)
|
||||
{
|
||||
self.update_column_requirements(&plan);
|
||||
self.expand_on_next_part_cond_trans_commutative = true;
|
||||
self.stage.push(plan)
|
||||
}
|
||||
}
|
||||
@@ -202,7 +242,7 @@ impl PlanRewriter {
|
||||
&& let Some(transformer_actions) = transformer(plan)
|
||||
{
|
||||
debug!(
|
||||
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
|
||||
"PlanRewriter: transformed plan: {:?}\n from {plan}",
|
||||
transformer_actions.extra_parent_plans
|
||||
);
|
||||
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
|
||||
@@ -226,6 +266,10 @@ impl PlanRewriter {
|
||||
}
|
||||
|
||||
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
|
||||
debug!(
|
||||
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
let mut container = HashSet::new();
|
||||
for expr in plan.expressions() {
|
||||
// this method won't fail
|
||||
@@ -235,6 +279,10 @@ impl PlanRewriter {
|
||||
for col in container {
|
||||
self.column_requirements.insert(col);
|
||||
}
|
||||
debug!(
|
||||
"PlanRewriter: updated column requirements: {:?}",
|
||||
self.column_requirements
|
||||
);
|
||||
}
|
||||
|
||||
fn is_expanded(&self) -> bool {
|
||||
|
||||
@@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
is_batch_coalesced = true;
|
||||
}
|
||||
|
||||
// only a very limited set of plans can exist between region scan and sort exec
|
||||
// other plans might make this optimize wrong, so be safe here by limiting it
|
||||
if !(plan.as_any().is::<ProjectionExec>()
|
||||
|| plan.as_any().is::<FilterExec>()
|
||||
|| plan.as_any().is::<CoalesceBatchesExec>())
|
||||
{
|
||||
partition_ranges = None;
|
||||
}
|
||||
|
||||
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
|
||||
// Collects alias of the time index column.
|
||||
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
|
||||
@@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
|
||||
}
|
||||
|
||||
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
|
||||
// `PerSeries` distribution is not supported in windowed sort.
|
||||
if region_scan_exec.distribution()
|
||||
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
|
||||
{
|
||||
partition_ranges = None;
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
|
||||
// Reset time index column.
|
||||
time_index = HashSet::from([region_scan_exec.time_index()]);
|
||||
|
||||
@@ -96,9 +96,10 @@ impl PartSortExec {
|
||||
|
||||
if partition >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {}",
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
partition,
|
||||
self.partition_ranges.len()
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -322,9 +323,10 @@ impl PartSortStream {
|
||||
) -> datafusion_common::Result<()> {
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {}",
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
self.cur_part_idx,
|
||||
self.partition_ranges.len()
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
)?;
|
||||
}
|
||||
let cur_range = self.partition_ranges[self.cur_part_idx];
|
||||
@@ -355,9 +357,10 @@ impl PartSortStream {
|
||||
// check if the current partition index is out of range
|
||||
if self.cur_part_idx >= self.partition_ranges.len() {
|
||||
internal_err!(
|
||||
"Partition index out of range: {} >= {}",
|
||||
"Partition index out of range: {} >= {} at {}",
|
||||
self.cur_part_idx,
|
||||
self.partition_ranges.len()
|
||||
self.partition_ranges.len(),
|
||||
snafu::location!()
|
||||
)?;
|
||||
}
|
||||
let cur_range = self.partition_ranges[self.cur_part_idx];
|
||||
|
||||
@@ -950,6 +950,37 @@ pub async fn show_flows(
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
pub async fn show_triggers(
|
||||
stmt: sql::statements::show::trigger::ShowTriggers,
|
||||
query_engine: &QueryEngineRef,
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
use catalog::information_schema::TRIGGER_LIST;
|
||||
|
||||
const TRIGGER_NAME: &str = "trigger_name";
|
||||
const TRIGGERS_COLUMN: &str = "Triggers";
|
||||
|
||||
let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
|
||||
let like_field = Some(TRIGGER_NAME);
|
||||
let sort = vec![col(TRIGGER_NAME).sort(true, true)];
|
||||
|
||||
query_from_information_schema_table(
|
||||
query_engine,
|
||||
catalog_manager,
|
||||
query_ctx,
|
||||
TRIGGER_LIST,
|
||||
vec![],
|
||||
projects,
|
||||
vec![],
|
||||
like_field,
|
||||
sort,
|
||||
stmt.kind,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn show_create_flow(
|
||||
flow_name: ObjectName,
|
||||
flow_val: FlowInfoValue,
|
||||
|
||||
@@ -48,6 +48,7 @@ common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-session.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version = { workspace = true, features = ["codec"] }
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::time::Duration;
|
||||
|
||||
use chrono::NaiveDate;
|
||||
use common_query::prelude::ScalarValue;
|
||||
use common_sql::convert::sql_value_to_value;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_expr::LogicalPlan;
|
||||
@@ -27,7 +28,6 @@ use itertools::Itertools;
|
||||
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
|
||||
use snafu::ResultExt;
|
||||
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
use crate::error::{self, DataFusionSnafu, Result};
|
||||
|
||||
@@ -19,6 +19,7 @@ common-decimal.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-query.workspace = true
|
||||
common-sql.workspace = true
|
||||
common-time.workspace = true
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
|
||||
@@ -17,16 +17,13 @@ use std::any::Any;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_macro::stack_trace_debug;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_sql::sqlparser::ast::UnaryOperator;
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use snafu::{Location, Snafu};
|
||||
use sqlparser::ast::Ident;
|
||||
use sqlparser::parser::ParserError;
|
||||
|
||||
use crate::ast::{Expr, Value as SqlValue};
|
||||
use crate::ast::Expr;
|
||||
use crate::parsers::error::TQLError;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -59,18 +56,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unsupported expr in default constraint: {:?} for column: {}",
|
||||
expr,
|
||||
column_name
|
||||
))]
|
||||
UnsupportedDefaultValue {
|
||||
column_name: String,
|
||||
expr: Expr,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
// Syntax error from sql parser.
|
||||
#[snafu(display("Invalid SQL syntax"))]
|
||||
Syntax {
|
||||
@@ -218,30 +203,6 @@ pub enum Error {
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
|
||||
InvalidCast {
|
||||
sql_value: sqlparser::ast::Value,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
|
||||
InvalidUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
value: Value,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unsupported unary operator {}", unary_op))]
|
||||
UnsupportedUnaryOp {
|
||||
unary_op: UnaryOperator,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unrecognized table option key: {}", key))]
|
||||
InvalidTableOption {
|
||||
key: String,
|
||||
@@ -271,25 +232,6 @@ pub enum Error {
|
||||
source: api::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid sql value: {}", value))]
|
||||
InvalidSqlValue {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Converting timestamp {:?} to unit {:?} overflow",
|
||||
timestamp,
|
||||
target_unit
|
||||
))]
|
||||
TimestampOverflow {
|
||||
timestamp: Timestamp,
|
||||
target_unit: TimeUnit,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert statement {} to DataFusion statement", statement))]
|
||||
ConvertToDfStatement {
|
||||
statement: String,
|
||||
@@ -297,14 +239,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
|
||||
ConvertSqlValue {
|
||||
value: SqlValue,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert value {} to sql value", value))]
|
||||
ConvertValue {
|
||||
value: Value,
|
||||
@@ -354,13 +288,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Datatype error: {}", source))]
|
||||
Datatype {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Invalid partition number: {}, should be in range [2, 65536]",
|
||||
partition_num
|
||||
@@ -371,14 +298,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
|
||||
ConvertStr {
|
||||
value: String,
|
||||
datatype: ConcreteDataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[snafu(display("Missing `{}` clause", name))]
|
||||
MissingClause {
|
||||
@@ -410,6 +329,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Sql common error"))]
|
||||
SqlCommon {
|
||||
source: common_sql::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -417,7 +343,7 @@ impl ErrorExt for Error {
|
||||
use Error::*;
|
||||
|
||||
match self {
|
||||
UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported,
|
||||
Unsupported { .. } => StatusCode::Unsupported,
|
||||
Unexpected { .. }
|
||||
| Syntax { .. }
|
||||
| TQLSyntax { .. }
|
||||
@@ -441,17 +367,11 @@ impl ErrorExt for Error {
|
||||
| InvalidTableName { .. }
|
||||
| InvalidFlowName { .. }
|
||||
| InvalidFlowQuery { .. }
|
||||
| InvalidSqlValue { .. }
|
||||
| TimestampOverflow { .. }
|
||||
| InvalidTableOption { .. }
|
||||
| InvalidCast { .. }
|
||||
| ConvertToLogicalExpression { .. }
|
||||
| Simplification { .. }
|
||||
| InvalidInterval { .. }
|
||||
| InvalidUnaryOp { .. }
|
||||
| InvalidPartitionNumber { .. }
|
||||
| UnsupportedUnaryOp { .. }
|
||||
| ConvertStr { .. } => StatusCode::InvalidArguments,
|
||||
| InvalidPartitionNumber { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
|
||||
@@ -463,9 +383,9 @@ impl ErrorExt for Error {
|
||||
|
||||
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
|
||||
ConvertToGrpcDataType { source, .. } => source.status_code(),
|
||||
Datatype { source, .. } => source.status_code(),
|
||||
SqlCommon { source, .. } => source.status_code(),
|
||||
ConvertToDfStatement { .. } => StatusCode::Internal,
|
||||
ConvertSqlValue { .. } | ConvertValue { .. } => StatusCode::Unsupported,
|
||||
ConvertValue { .. } => StatusCode::Unsupported,
|
||||
|
||||
PermissionDenied { .. } => StatusCode::PermissionDenied,
|
||||
SetFulltextOption { .. } | SetSkippingIndexOption { .. } => StatusCode::Unexpected,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -233,3 +233,65 @@ DROP TABLE lightning;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
|
||||
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
|
||||
`greptime_value` DOUBLE NULL,
|
||||
`instance` STRING NULL,
|
||||
`job` STRING NULL,
|
||||
TIME INDEX (`greptime_timestamp`),
|
||||
PRIMARY KEY (`instance`, `job`)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO `instance_job_metrics` VALUES
|
||||
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
|
||||
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
|
||||
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
+---------------------+------------------------------------------+
|
||||
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
|
||||
+---------------------+------------------------------------------+
|
||||
| 2023-10-01T00:00:01 | 1696118400.0 |
|
||||
| 2023-10-01T00:00:02 | 3392236800.0 |
|
||||
| 2023-10-01T00:00:03 | 5088355200.0 |
|
||||
| 2023-10-01T00:00:04 | 5088355200.0 |
|
||||
| 2023-10-01T00:00:05 | 5088355200.0 |
|
||||
+---------------------+------------------------------------------+
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|
||||
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 5_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE IF EXISTS `instance_job_metrics`;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -120,3 +120,30 @@ ORDER BY
|
||||
true_collect_time DESC;
|
||||
|
||||
DROP TABLE lightning;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
|
||||
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
|
||||
`greptime_value` DOUBLE NULL,
|
||||
`instance` STRING NULL,
|
||||
`job` STRING NULL,
|
||||
TIME INDEX (`greptime_timestamp`),
|
||||
PRIMARY KEY (`instance`, `job`)
|
||||
);
|
||||
|
||||
INSERT INTO `instance_job_metrics` VALUES
|
||||
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
|
||||
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
|
||||
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
|
||||
|
||||
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
-- SQLNESS REPLACE (Hash.*) REDACTED
|
||||
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
|
||||
|
||||
DROP TABLE IF EXISTS `instance_job_metrics`;
|
||||
|
||||
@@ -234,3 +234,57 @@ drop table test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
+-+-+-+
|
||||
| stage | node | plan_|
|
||||
+-+-+-+
|
||||
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|
||||
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|
||||
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|
||||
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|
||||
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
|
||||
@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
|
||||
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
|
||||
|
||||
drop table test;
|
||||
|
||||
CREATE TABLE test2 (
|
||||
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
|
||||
"greptime_value" DOUBLE NULL,
|
||||
"shard" STRING NULL INVERTED INDEX,
|
||||
TIME INDEX ("greptime_timestamp"),
|
||||
PRIMARY KEY ("shard")
|
||||
)
|
||||
PARTITION ON COLUMNS ("shard") (
|
||||
shard <= '2',
|
||||
shard > '2'
|
||||
);
|
||||
|
||||
TQL EVAL sum(test2);
|
||||
|
||||
-- SQLNESS REPLACE (metrics.*) REDACTED
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
-- SQLNESS REPLACE (-+) -
|
||||
-- SQLNESS REPLACE (\s\s+) _
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
|
||||
TQL ANALYZE sum(test2);
|
||||
|
||||
DROP TABLE test2;
|
||||
|
||||
Reference in New Issue
Block a user