Compare commits

..

19 Commits

Author SHA1 Message Date
discord9
365421c452 Revert "feat: stream drop record metrics"
This reverts commit 3eda4a2257928d95cf9c1328ae44fae84cfbb017.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
b361c5f50a chore: more dbg
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
5d78bc1efa test: update sqlness
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
99c78b2f97 fix: expand differently
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
bdecdb869e feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>

refactor: move logging to drop too

Signed-off-by: discord9 <discord9@163.com>

fix: drop input stream before collect metrics

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
df4cd157e1 Revert "feat: stream drop record metrics"
This reverts commit 6a16946a5b8ea37557bbb1b600847d24274d6500.

Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
60fbe54f90 feat: stream drop record metrics
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
7c15e71407 revert
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
13f20b5a40 add logging to figure test failure
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
2b802e45f5 update sqlness result
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
Ruihang Xia
def9b7c01d fix: expand on conditional commutative as well
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
2025-07-10 17:23:49 +08:00
discord9
af03e89139 fix: stricter win sort condition (#6477)
test: sqlness



test: fix sqlness redacted

Signed-off-by: discord9 <discord9@163.com>
2025-07-08 22:27:17 +00:00
jeremyhi
e7a64b7dc0 chore: refactor register_region method to avoid TOCTOU issues (#6468) 2025-07-08 13:26:38 +00:00
Lin Yihai
29739b556e refactor: split some convert function into sql-common crate (#6452)
refactor: split some convert function into `sql-common` crates

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>
2025-07-08 12:08:33 +00:00
Lei, HUANG
77e50d0e08 chore: expose some config (#6479)
refactor/expose-config:
 ### Make SubCommand and Fields Public in `frontend.rs`

 - Made `subcmd` field in `Command` struct public.
 - Made `SubCommand` enum public.
 - Made `config_file` and `env_prefix` fields in `StartCommand` struct public.

 These changes enhance the accessibility of command-related structures and fields, facilitating external usage and integration.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-07-08 11:52:23 +00:00
LFC
c2f1447345 refactor: stores the http server builder in Metasrv instance (#6461)
* refactor: stores the http server builder in Metasrv instance

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2025-07-07 06:39:05 +00:00
Weny Xu
30f7955d2b feat: add column metadata to response extensions (#6451)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-07-07 03:38:13 +00:00
fys
3508fddd74 feat: support show triggers (#6465)
* feat: support show triggers

* add enterprise feature

* chore: remove unused error
2025-07-07 03:30:57 +00:00
Weny Xu
351c741c70 fix(metric-engine): handle stale metadata region recovery failures (#6395)
* fix(metric-engine): handle stale metadata region recovery failures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-07-07 03:30:40 +00:00
45 changed files with 2405 additions and 1446 deletions

37
Cargo.lock generated
View File

@@ -1975,7 +1975,6 @@ dependencies = [
"common-version",
"common-wal",
"datatypes",
"either",
"etcd-client",
"futures",
"humantime",
@@ -2103,7 +2102,6 @@ dependencies = [
"common-wal",
"datanode",
"datatypes",
"either",
"etcd-client",
"file-engine",
"flow",
@@ -2670,6 +2668,24 @@ dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-sql"
version = "0.16.0"
dependencies = [
"common-base",
"common-datasource",
"common-decimal",
"common-error",
"common-macro",
"common-time",
"datafusion-sql",
"datatypes",
"hex",
"jsonb",
"snafu 0.8.5",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
]
[[package]]
name = "common-telemetry"
version = "0.16.0"
@@ -4201,9 +4217,9 @@ dependencies = [
[[package]]
name = "either"
version = "1.13.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
dependencies = [
"serde",
]
@@ -6699,7 +6715,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]
[[package]]
@@ -7172,6 +7188,7 @@ dependencies = [
"deadpool",
"deadpool-postgres",
"derive_builder 0.20.1",
"either",
"etcd-client",
"futures",
"h2 0.3.26",
@@ -7243,6 +7260,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
"common-meta",
"common-query",
"common-recordbatch",
"common-runtime",
@@ -8457,6 +8475,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"common-runtime",
"common-sql",
"common-telemetry",
"common-test-util",
"common-time",
@@ -9594,7 +9613,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf"
dependencies = [
"heck 0.5.0",
"itertools 0.11.0",
"itertools 0.14.0",
"log",
"multimap",
"once_cell",
@@ -9640,7 +9659,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
dependencies = [
"anyhow",
"itertools 0.11.0",
"itertools 0.14.0",
"proc-macro2",
"quote",
"syn 2.0.100",
@@ -11243,6 +11262,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-session",
"common-sql",
"common-telemetry",
"common-test-util",
"common-time",
@@ -11681,6 +11701,7 @@ dependencies = [
"common-error",
"common-macro",
"common-query",
"common-sql",
"common-time",
"datafusion",
"datafusion-common",
@@ -14266,7 +14287,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.48.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -30,6 +30,7 @@ members = [
"src/common/recordbatch",
"src/common/runtime",
"src/common/session",
"src/common/sql",
"src/common/stat",
"src/common/substrait",
"src/common/telemetry",
@@ -133,6 +134,7 @@ deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
either = "1.15"
etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
@@ -261,6 +263,7 @@ common-query = { path = "src/common/query" }
common-recordbatch = { path = "src/common/recordbatch" }
common-runtime = { path = "src/common/runtime" }
common-session = { path = "src/common/session" }
common-sql = { path = "src/common/sql" }
common-telemetry = { path = "src/common/telemetry" }
common-test-util = { path = "src/common/test-util" }
common-time = { path = "src/common/time" }

View File

@@ -43,7 +43,6 @@ common-time.workspace = true
common-version.workspace = true
common-wal.workspace = true
datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
futures.workspace = true
humantime.workspace = true

View File

@@ -52,7 +52,6 @@ common-version.workspace = true
common-wal.workspace = true
datanode.workspace = true
datatypes.workspace = true
either = "1.8"
etcd-client.workspace = true
file-engine.workspace = true
flow.workspace = true

View File

@@ -102,7 +102,7 @@ impl App for Instance {
#[derive(Parser)]
pub struct Command {
#[clap(subcommand)]
subcmd: SubCommand,
pub subcmd: SubCommand,
}
impl Command {
@@ -116,7 +116,7 @@ impl Command {
}
#[derive(Parser)]
enum SubCommand {
pub enum SubCommand {
Start(StartCommand),
}
@@ -153,7 +153,7 @@ pub struct StartCommand {
#[clap(long)]
postgres_addr: Option<String>,
#[clap(short, long)]
config_file: Option<String>,
pub config_file: Option<String>,
#[clap(short, long)]
influxdb_enable: Option<bool>,
#[clap(long, value_delimiter = ',', num_args = 1..)]
@@ -169,7 +169,7 @@ pub struct StartCommand {
#[clap(long)]
disable_dashboard: Option<bool>,
#[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
env_prefix: String,
pub env_prefix: String,
}
impl StartCommand {

View File

@@ -54,6 +54,10 @@ impl Instance {
pub fn get_inner(&self) -> &MetasrvInstance {
&self.instance
}
pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
&mut self.instance
}
}
#[async_trait]

22
src/common/sql/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "common-sql"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
common-base.workspace = true
common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-time.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true
hex = "0.4"
jsonb.workspace = true
snafu.workspace = true
sqlparser.workspace = true
[lints]
workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,182 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::ColumnDefaultConstraint;
pub use sqlparser::ast::{
visit_expressions_mut, visit_statements_mut, BinaryOperator, ColumnDef, ColumnOption,
ColumnOptionDef, DataType, Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments,
Ident, ObjectName, SqlOption, TableConstraint, TimezoneInfo, UnaryOperator, Value as SqlValue,
Visit, VisitMut, Visitor, VisitorMut,
};
use crate::convert::{sql_number_to_value, sql_value_to_value};
use crate::error::{Result, UnsupportedDefaultValueSnafu};
pub fn parse_column_default_constraint(
column_name: &str,
data_type: &ConcreteDataType,
opts: &[ColumnOptionDef],
timezone: Option<&Timezone>,
) -> Result<Option<ColumnDefaultConstraint>> {
if let Some(opt) = opts
.iter()
.find(|o| matches!(o.option, ColumnOption::Default(_)))
{
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, v, timezone, None, false)?,
),
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
// normalize CURRENT_TIMESTAMP to CURRENT_TIMESTAMP()
if func == CURRENT_TIMESTAMP {
func = CURRENT_TIMESTAMP_FN.to_string();
}
// Always use lowercase for function expression
ColumnDefaultConstraint::Function(func.to_lowercase())
}
ColumnOption::Default(Expr::UnaryOp { op, expr }) => {
// Specialized process for handling numerical inputs to prevent
// overflow errors during the parsing of negative numbers,
// See https://github.com/GreptimeTeam/greptimedb/issues/4351
if let (UnaryOperator::Minus, Expr::Value(SqlValue::Number(n, _))) =
(op, expr.as_ref())
{
return Ok(Some(ColumnDefaultConstraint::Value(sql_number_to_value(
data_type,
&format!("-{n}"),
)?)));
}
if let Expr::Value(v) = &**expr {
let value =
sql_value_to_value(column_name, data_type, v, timezone, Some(*op), false)?;
ColumnDefaultConstraint::Value(value)
} else {
return UnsupportedDefaultValueSnafu {
column_name,
expr: *expr.clone(),
}
.fail();
}
}
ColumnOption::Default(others) => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: others.clone(),
}
.fail();
}
_ => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: Expr::Value(SqlValue::Null),
}
.fail();
}
};
Ok(Some(default_constraint))
} else {
Ok(None)
}
}
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::types::BooleanType;
use super::*;
#[test]
pub fn test_parse_column_default_constraint() {
let bool_value = sqlparser::ast::Value::Boolean(true);
let opts = vec![
ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(bool_value)),
},
ColumnOptionDef {
name: None,
option: ColumnOption::NotNull,
},
];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::Boolean(BooleanType),
&opts,
None,
)
.unwrap();
assert_matches!(
constraint,
Some(ColumnDefaultConstraint::Value(Value::Boolean(true)))
);
// Test negative number
let opts = vec![ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::UnaryOp {
op: UnaryOperator::Minus,
expr: Box::new(Expr::Value(SqlValue::Number("32768".to_string(), false))),
}),
}];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::int16_datatype(),
&opts,
None,
)
.unwrap();
assert_matches!(
constraint,
Some(ColumnDefaultConstraint::Value(Value::Int16(-32768)))
);
}
#[test]
fn test_incorrect_default_value_issue_3479() {
let opts = vec![ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(SqlValue::Number(
"0.047318541668048164".into(),
false,
))),
}];
let constraint = parse_column_default_constraint(
"coll",
&ConcreteDataType::float64_datatype(),
&opts,
None,
)
.unwrap()
.unwrap();
assert_eq!("0.047318541668048164", constraint.to_string());
let encoded: Vec<u8> = constraint.clone().try_into().unwrap();
let decoded = ColumnDefaultConstraint::try_from(encoded.as_ref()).unwrap();
assert_eq!(decoded, constraint);
}
}

158
src/common/sql/src/error.rs Normal file
View File

@@ -0,0 +1,158 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_sql::sqlparser::ast::UnaryOperator;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
pub use sqlparser::ast::{Expr, Value as SqlValue};
pub type Result<T> = std::result::Result<T, Error>;
/// SQL parser errors.
// Now the error in parser does not contain backtrace to avoid generating backtrace
// every time the parser parses an invalid SQL.
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Column {} expect type: {:?}, actual: {:?}",
column_name,
expect,
actual,
))]
ColumnTypeMismatch {
column_name: String,
expect: ConcreteDataType,
actual: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse value: {}", msg))]
ParseSqlValue {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Unsupported expr in default constraint: {:?} for column: {}",
expr,
column_name
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
ConvertSqlValue {
value: SqlValue,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported unary operator {}", unary_op))]
UnsupportedUnaryOp {
unary_op: UnaryOperator,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
InvalidUnaryOp {
unary_op: UnaryOperator,
value: Value,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
InvalidCast {
sql_value: sqlparser::ast::Value,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
ConvertStr {
value: String,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Converting timestamp {:?} to unit {:?} overflow",
timestamp,
target_unit
))]
TimestampOverflow {
timestamp: Timestamp,
target_unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Datatype error: {}", source))]
Datatype {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
UnsupportedDefaultValue { .. } => StatusCode::Unsupported,
ParseSqlValue { .. } => StatusCode::InvalidSyntax,
ColumnTypeMismatch { .. }
| InvalidSqlValue { .. }
| UnsupportedUnaryOp { .. }
| InvalidUnaryOp { .. }
| InvalidCast { .. }
| ConvertStr { .. }
| TimestampOverflow { .. } => StatusCode::InvalidArguments,
Datatype { source, .. } => source.status_code(),
ConvertSqlValue { .. } => StatusCode::Unsupported,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

19
src/common/sql/src/lib.rs Normal file
View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
pub mod convert;
pub mod default_constraint;
pub mod error;

View File

@@ -86,26 +86,33 @@ impl RegionAliveKeeper {
/// Add the countdown task for a specific region.
/// It will be ignored if the task exists.
pub async fn register_region(&self, region_id: RegionId) {
if self.find_handle(region_id).await.is_some() {
return;
}
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
self.countdown_task_handler_ext.clone(),
region_id,
));
let mut handles = self.tasks.lock().await;
let _ = handles.insert(region_id, handle.clone());
let should_start = {
let mut handles = self.tasks.lock().await;
if self.started.load(Ordering::Relaxed) {
// Check if already exists, return early if so
if handles.contains_key(&region_id) {
return;
}
// Insert new handle
handles.insert(region_id, handle.clone());
// Return whether we should start (check state inside lock)
self.started.load(Ordering::Relaxed)
};
if should_start {
handle.start(self.heartbeat_interval_millis).await;
info!("Region alive countdown for region {region_id} is started!",);
info!("Region alive countdown for region {region_id} is started!");
} else {
info!(
"Region alive countdown for region {region_id} is registered but not started yet!",
"Region alive countdown for region {region_id} is registered but not started yet!"
);
}
}

View File

@@ -45,6 +45,7 @@ datatypes.workspace = true
deadpool = { workspace = true, optional = true }
deadpool-postgres = { workspace = true, optional = true }
derive_builder.workspace = true
either.workspace = true
etcd-client.workspace = true
futures.workspace = true
h2 = "0.3"

View File

@@ -34,6 +34,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use either::Either;
use etcd_client::Client;
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
@@ -76,7 +77,7 @@ use crate::{error, Result};
pub struct MetasrvInstance {
metasrv: Arc<Metasrv>,
http_server: HttpServer,
http_server: Either<Option<HttpServerBuilder>, HttpServer>,
opts: MetasrvOptions,
@@ -99,10 +100,9 @@ impl MetasrvInstance {
plugins: Plugins,
metasrv: Metasrv,
) -> Result<MetasrvInstance> {
let http_server = HttpServerBuilder::new(opts.http.clone())
let builder = HttpServerBuilder::new(opts.http.clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?)
.build();
.with_greptime_config_options(opts.to_toml().context(error::TomlFormatSnafu)?);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
@@ -111,7 +111,7 @@ impl MetasrvInstance {
.context(error::InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
metasrv,
http_server,
http_server: Either::Left(Some(builder)),
opts,
signal_sender: None,
plugins,
@@ -122,6 +122,25 @@ impl MetasrvInstance {
}
pub async fn start(&mut self) -> Result<()> {
if let Some(builder) = self.http_server.as_mut().left()
&& let Some(builder) = builder.take()
{
let mut server = builder.build();
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
info!("starting http server at {}", addr);
server.start(addr).await.context(error::StartHttpSnafu)?;
self.http_server = Either::Right(server);
} else {
// If the http server builder is not present, the Metasrv has to be called "start"
// already, regardless of the startup was successful or not. Return an `Ok` here for
// simplicity.
return Ok(());
};
self.metasrv.try_start().await?;
if let Some(t) = self.export_metrics_task.as_ref() {
@@ -144,14 +163,6 @@ impl MetasrvInstance {
.await?;
self.bind_addr = Some(socket_addr);
let addr = self.opts.http.addr.parse().context(error::ParseAddrSnafu {
addr: &self.opts.http.addr,
})?;
self.http_server
.start(addr)
.await
.context(error::StartHttpSnafu)?;
*self.serve_state.lock().await = Some(serve_state_rx);
Ok(())
}
@@ -169,12 +180,15 @@ impl MetasrvInstance {
.context(error::SendShutdownSignalSnafu)?;
}
self.metasrv.shutdown().await?;
self.http_server
.shutdown()
.await
.context(error::ShutdownServerSnafu {
server: self.http_server.name(),
})?;
if let Some(http_server) = self.http_server.as_ref().right() {
http_server
.shutdown()
.await
.context(error::ShutdownServerSnafu {
server: http_server.name(),
})?;
}
Ok(())
}
@@ -188,6 +202,14 @@ impl MetasrvInstance {
pub fn bind_addr(&self) -> &Option<SocketAddr> {
&self.bind_addr
}
pub fn mut_http_server(&mut self) -> &mut Either<Option<HttpServerBuilder>, HttpServer> {
&mut self.http_server
}
pub fn http_server(&self) -> Option<&HttpServer> {
self.http_server.as_ref().right()
}
}
pub async fn bootstrap_metasrv_with_router(

View File

@@ -15,6 +15,7 @@
#![feature(result_flattening)]
#![feature(assert_matches)]
#![feature(hash_set_entry)]
#![feature(let_chains)]
pub mod bootstrap;
pub mod cache_invalidator;

View File

@@ -40,5 +40,6 @@ store-api.workspace = true
tokio.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
mito2 = { workspace = true, features = ["test"] }

View File

@@ -477,8 +477,9 @@ struct MetricEngineInner {
mod test {
use std::collections::HashMap;
use common_telemetry::info;
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
use store_api::region_request::{RegionCloseRequest, RegionFlushRequest, RegionOpenRequest};
use super::*;
use crate::test_util::TestEnv;
@@ -563,4 +564,90 @@ mod test {
assert!(env.metric().region_statistic(logical_region_id).is_none());
assert!(env.metric().region_statistic(physical_region_id).is_some());
}
#[tokio::test]
async fn test_open_region_failure() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let physical_region_id = env.default_physical_region_id();
let metric_engine = env.metric();
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let path = format!("{}/metadata/", env.default_region_dir());
let object_store = env.get_object_store().unwrap();
let list = object_store.list(&path).await.unwrap();
// Delete parquet files in metadata region
for entry in list {
if entry.metadata().is_dir() {
continue;
}
if entry.name().ends_with("parquet") {
info!("deleting {}", entry.path());
object_store.delete(entry.path()).await.unwrap();
}
}
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
// Opening an already opened region should succeed.
// Since the region is already open, no metadata recovery operations will be performed.
metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap();
// Close the region
metric_engine
.handle_request(
physical_region_id,
RegionRequest::Close(RegionCloseRequest {}),
)
.await
.unwrap();
// Try to reopen region.
let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect();
let open_request = RegionOpenRequest {
engine: METRIC_ENGINE_NAME.to_string(),
region_dir: env.default_region_dir(),
options: physical_region_option,
skip_wal_replay: false,
};
let err = metric_engine
.handle_request(physical_region_id, RegionRequest::Open(open_request))
.await
.unwrap_err();
// Failed to open region because of missing parquet files.
assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
let mito_engine = metric_engine.mito();
let data_region_id = utils::to_data_region_id(physical_region_id);
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
// The metadata/data region should be closed.
let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
let err = mito_engine
.get_metadata(metadata_region_id)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RegionNotFound);
}
}

View File

@@ -66,7 +66,7 @@ impl MetricEngineInner {
let mut manifest_infos = Vec::with_capacity(1);
self.alter_logical_regions(physical_region_id, requests, extension_return_value)
.await?;
append_manifest_info(&self.mito, region_id, &mut manifest_infos);
append_manifest_info(&self.mito, physical_region_id, &mut manifest_infos);
encode_manifest_info_to_extensions(&manifest_infos, extension_return_value)?;
} else {
let grouped_requests =
@@ -222,13 +222,17 @@ mod test {
use std::time::Duration;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AddColumn, SetRegionOption};
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AlterKind, BatchRegionDdlRequest, RegionAlterRequest, SetRegionOption,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;
use super::*;
use crate::test_util::TestEnv;
use crate::test_util::{alter_logical_region_request, create_logical_region_request, TestEnv};
#[tokio::test]
async fn test_alter_region() {
@@ -239,22 +243,7 @@ mod test {
// alter physical region
let physical_region_id = env.default_physical_region_id();
let request = RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
};
let request = alter_logical_region_request(&["tag1"]);
let result = engine_inner
.alter_physical_region(physical_region_id, request.clone())
@@ -287,14 +276,18 @@ mod test {
assert!(!is_column_exist);
let region_id = env.default_logical_region_id();
engine_inner
.alter_logical_regions(
physical_region_id,
vec![(region_id, request)],
&mut HashMap::new(),
)
let response = env
.metric()
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![(
region_id,
request.clone(),
)]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos[0].0, physical_region_id);
assert!(manifest_infos[0].1.is_metric());
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
@@ -307,5 +300,77 @@ mod test {
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
("tag1", 3),
],
);
}
#[tokio::test]
async fn test_alter_logical_regions() {
let env = TestEnv::new().await;
let engine = env.metric();
let physical_region_id1 = RegionId::new(1024, 0);
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 = crate::test_util::create_logical_region_request(
&["job"],
physical_region_id1,
"logical1",
);
let region_create_request2 =
create_logical_region_request(&["job"], physical_region_id2, "logical2");
engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
(logical_region_id1, region_create_request1),
(logical_region_id2, region_create_request2),
]))
.await
.unwrap();
let region_alter_request1 = alter_logical_region_request(&["tag1"]);
let region_alter_request2 = alter_logical_region_request(&["tag1"]);
let response = engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Alter(vec![
(logical_region_id1, region_alter_request1),
(logical_region_id2, region_alter_request2),
]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos.len(), 2);
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
assert!(region_ids.contains(&physical_region_id1));
assert!(region_ids.contains(&physical_region_id2));
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
("tag1", 3),
],
);
}
}

View File

@@ -59,7 +59,7 @@ impl MetricEngineInner {
}
}
async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
pub(crate) async fn close_physical_region(&self, region_id: RegionId) -> Result<AffectedRows> {
let data_region_id = utils::to_data_region_id(region_id);
let metadata_region_id = utils::to_metadata_region_id(region_id);

View File

@@ -80,7 +80,8 @@ impl MetricEngineInner {
}
);
let (region_id, request) = requests.pop().unwrap();
self.create_physical_region(region_id, request).await?;
self.create_physical_region(region_id, request, extension_return_value)
.await?;
return Ok(0);
} else if first_request
@@ -122,6 +123,7 @@ impl MetricEngineInner {
&self,
region_id: RegionId,
request: RegionCreateRequest,
extension_return_value: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
@@ -162,7 +164,8 @@ impl MetricEngineInner {
.context(UnexpectedRequestSnafu {
reason: "No time index column found",
})?;
self.mito
let response = self
.mito
.handle_request(
data_region_id,
RegionRequest::Create(create_data_region_request),
@@ -176,6 +179,7 @@ impl MetricEngineInner {
region_id: data_region_id,
},
)?;
extension_return_value.extend(response.extensions);
info!("Created physical metric region {region_id}, primary key encoding={primary_key_encoding}, physical_region_options={physical_region_options:?}");
PHYSICAL_REGION_COUNT.inc();
@@ -613,12 +617,15 @@ pub(crate) fn region_options_for_metadata_region(
#[cfg(test)]
mod test {
use common_meta::ddl::test_util::assert_column_name_and_id;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use store_api::region_request::BatchRegionDdlRequest;
use super::*;
use crate::config::EngineConfig;
use crate::engine::MetricEngine;
use crate::test_util::TestEnv;
use crate::test_util::{create_logical_region_request, TestEnv};
#[test]
fn test_verify_region_create_request() {
@@ -807,4 +814,50 @@ mod test {
);
assert!(!metadata_region_request.options.contains_key("skip_wal"));
}
#[tokio::test]
async fn test_create_logical_regions() {
let env = TestEnv::new().await;
let engine = env.metric();
let physical_region_id1 = RegionId::new(1024, 0);
let physical_region_id2 = RegionId::new(1024, 1);
let logical_region_id1 = RegionId::new(1025, 0);
let logical_region_id2 = RegionId::new(1025, 1);
env.create_physical_region(physical_region_id1, "/test_dir1")
.await;
env.create_physical_region(physical_region_id2, "/test_dir2")
.await;
let region_create_request1 =
create_logical_region_request(&["job"], physical_region_id1, "logical1");
let region_create_request2 =
create_logical_region_request(&["job"], physical_region_id2, "logical2");
let response = engine
.handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
(logical_region_id1, region_create_request1),
(logical_region_id2, region_create_request2),
]))
.await
.unwrap();
let manifest_infos = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifest_infos.len(), 2);
let region_ids = manifest_infos.into_iter().map(|i| i.0).collect::<Vec<_>>();
assert!(region_ids.contains(&physical_region_id1));
assert!(region_ids.contains(&physical_region_id2));
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_column_name_and_id(
&column_metadatas,
&[
("greptime_timestamp", 0),
("greptime_value", 1),
("__table_id", ReservedColumnId::table_id()),
("__tsid", ReservedColumnId::tsid()),
("job", 2),
],
);
}
}

View File

@@ -17,7 +17,7 @@
use api::region::RegionResponse;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{error, info, warn};
use datafusion::common::HashMap;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
@@ -94,6 +94,21 @@ impl MetricEngineInner {
Ok(responses)
}
// If the metadata region is opened with a stale manifest,
// the metric engine may fail to recover logical tables from the metadata region,
// as the manifest could reference files that have already been deleted
// due to compaction operations performed by the region leader.
async fn close_physical_region_on_recovery_failure(&self, physical_region_id: RegionId) {
info!(
"Closing metadata region {} and data region {} on metadata recovery failure",
utils::to_metadata_region_id(physical_region_id),
utils::to_data_region_id(physical_region_id)
);
if let Err(err) = self.close_physical_region(physical_region_id).await {
error!(err; "Failed to close physical region {}", physical_region_id);
}
}
async fn open_physical_region_with_results(
&self,
metadata_region_result: Option<std::result::Result<RegionResponse, BoxedError>>,
@@ -119,8 +134,14 @@ impl MetricEngineInner {
region_type: "data",
})?;
self.recover_states(physical_region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(physical_region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(physical_region_id)
.await;
return Err(err);
}
Ok(data_region_response)
}
@@ -139,11 +160,31 @@ impl MetricEngineInner {
request: RegionOpenRequest,
) -> Result<AffectedRows> {
if request.is_physical_table() {
if self
.state
.read()
.unwrap()
.physical_region_states()
.get(&region_id)
.is_some()
{
warn!(
"The physical region {} is already open, ignore the open request",
region_id
);
return Ok(0);
}
// open physical region and recover states
let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id, physical_region_options)
.await?;
if let Err(err) = self
.recover_states(region_id, physical_region_options)
.await
{
self.close_physical_region_on_recovery_failure(region_id)
.await;
return Err(err);
}
Ok(0)
} else {

View File

@@ -16,6 +16,7 @@
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use common_meta::ddl::utils::parse_column_metadatas;
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -23,14 +24,17 @@ use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
ALTER_PHYSICAL_EXTENSION_KEY, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
PHYSICAL_TABLE_METADATA_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{ColumnId, RegionId};
use crate::config::EngineConfig;
@@ -74,6 +78,10 @@ impl TestEnv {
join_dir(&env_root, "data")
}
pub fn get_object_store(&self) -> Option<ObjectStore> {
self.mito_env.get_object_store()
}
/// Returns a reference to the engine.
pub fn mito(&self) -> MitoEngine {
self.mito.clone()
@@ -111,13 +119,8 @@ impl TestEnv {
(mito, metric)
}
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///
/// This method will create one logical region with three columns `(ts, val, job)`
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let region_id = self.default_physical_region_id();
/// Create regions in [MetricEngine] with specific `physical_region_id`.
pub async fn create_physical_region(&self, physical_region_id: RegionId, region_dir: &str) {
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
@@ -144,26 +147,88 @@ impl TestEnv {
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect(),
region_dir: self.default_region_dir(),
region_dir: region_dir.to_string(),
};
// create physical region
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
let response = self
.metric()
.handle_request(
physical_region_id,
RegionRequest::Create(region_create_request),
)
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY)
.unwrap();
assert_eq!(column_metadatas.len(), 4);
}
// create logical region
let region_id = self.default_logical_region_id();
/// Create logical region in [MetricEngine] with specific `physical_region_id` and `logical_region_id`.
pub async fn create_logical_region(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) {
let region_create_request = create_logical_region_request(
&["job"],
self.default_physical_region_id(),
physical_region_id,
"test_metric_logical_region",
);
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
let response = self
.metric()
.handle_request(
logical_region_id,
RegionRequest::Create(region_create_request),
)
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, ALTER_PHYSICAL_EXTENSION_KEY).unwrap();
assert_eq!(column_metadatas.len(), 5);
let column_names = column_metadatas
.iter()
.map(|c| c.column_schema.name.as_str())
.collect::<Vec<_>>();
let column_ids = column_metadatas
.iter()
.map(|c| c.column_id)
.collect::<Vec<_>>();
assert_eq!(
column_names,
vec![
"greptime_timestamp",
"greptime_value",
"__table_id",
"__tsid",
"job",
]
);
assert_eq!(
column_ids,
vec![
0,
1,
ReservedColumnId::table_id(),
ReservedColumnId::tsid(),
2,
]
);
}
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///
/// This method will create one logical region with three columns `(ts, val, job)`
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let physical_region_id = self.default_physical_region_id();
self.create_physical_region(physical_region_id, &self.default_region_dir())
.await;
let logical_region_id = self.default_logical_region_id();
self.create_logical_region(physical_region_id, logical_region_id)
.await;
}
pub fn metadata_region(&self) -> MetadataRegion {
@@ -269,6 +334,30 @@ pub fn create_logical_region_request(
}
}
/// Generate a [RegionAlterRequest] for logical region.
/// Only need to specify tag column's name
pub fn alter_logical_region_request(tags: &[&str]) -> RegionAlterRequest {
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: tags
.iter()
.map(|tag| AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
tag.to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
})
.collect::<Vec<_>>(),
},
}
}
/// Generate a row schema with given tag columns.
///
/// The result will also contains default timestamp and value column at beginning.

View File

@@ -80,8 +80,10 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::MANIFEST_INFO_EXTENSION_KEY;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::metric_engine_consts::{
MANIFEST_INFO_EXTENSION_KEY, TABLE_COLUMN_METADATA_EXTENSION_KEY,
};
use store_api::region_engine::{
BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, SyncManifestResponse,
@@ -95,7 +97,7 @@ use crate::cache::CacheStrategy;
use crate::config::MitoConfig;
use crate::error::{
InvalidRequestSnafu, JoinSnafu, MitoManifestInfoSnafu, RecvSnafu, RegionNotFoundSnafu, Result,
SerdeJsonSnafu,
SerdeJsonSnafu, SerializeColumnMetadataSnafu,
};
#[cfg(feature = "enterprise")]
use crate::extension::BoxedExtensionRangeProviderFactory;
@@ -335,6 +337,22 @@ impl MitoEngine {
Ok(())
}
fn encode_column_metadatas_to_extensions(
region_id: &RegionId,
column_metadatas: Vec<ColumnMetadata>,
extensions: &mut HashMap<String, Vec<u8>>,
) -> Result<()> {
extensions.insert(
TABLE_COLUMN_METADATA_EXTENSION_KEY.to_string(),
ColumnMetadata::encode_list(&column_metadatas).context(SerializeColumnMetadataSnafu)?,
);
info!(
"Added column metadatas: {:?} to extensions, region_id: {:?}",
column_metadatas, region_id
);
Ok(())
}
/// Find the current version's memtables and SSTs stats by region_id.
/// The stats must be collected in one place one time to ensure data consistency.
pub fn find_memtable_and_sst_stats(
@@ -695,6 +713,7 @@ impl RegionEngine for MitoEngine {
.start_timer();
let is_alter = matches!(request, RegionRequest::Alter(_));
let is_create = matches!(request, RegionRequest::Create(_));
let mut response = self
.inner
.handle_request(region_id, request)
@@ -703,14 +722,11 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)?;
if is_alter {
if let Some(statistic) = self.region_statistic(region_id) {
Self::encode_manifest_info_to_extensions(
&region_id,
statistic.manifest,
&mut response.extensions,
)
self.handle_alter_response(region_id, &mut response)
.map_err(BoxedError::new)?;
} else if is_create {
self.handle_create_response(region_id, &mut response)
.map_err(BoxedError::new)?;
}
}
Ok(response)
@@ -803,6 +819,55 @@ impl RegionEngine for MitoEngine {
}
}
impl MitoEngine {
fn handle_alter_response(
&self,
region_id: RegionId,
response: &mut RegionResponse,
) -> Result<()> {
if let Some(statistic) = self.region_statistic(region_id) {
Self::encode_manifest_info_to_extensions(
&region_id,
statistic.manifest,
&mut response.extensions,
)?;
}
let column_metadatas = self
.inner
.find_region(region_id)
.ok()
.map(|r| r.metadata().column_metadatas.clone());
if let Some(column_metadatas) = column_metadatas {
Self::encode_column_metadatas_to_extensions(
&region_id,
column_metadatas,
&mut response.extensions,
)?;
}
Ok(())
}
fn handle_create_response(
&self,
region_id: RegionId,
response: &mut RegionResponse,
) -> Result<()> {
let column_metadatas = self
.inner
.find_region(region_id)
.ok()
.map(|r| r.metadata().column_metadatas.clone());
if let Some(column_metadatas) = column_metadatas {
Self::encode_column_metadatas_to_extensions(
&region_id,
column_metadatas,
&mut response.extensions,
)?;
}
Ok(())
}
}
// Tests methods.
#[cfg(any(test, feature = "test"))]
#[allow(clippy::too_many_arguments)]

View File

@@ -20,16 +20,18 @@ use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::metric_engine_consts::TABLE_COLUMN_METADATA_EXTENSION_KEY;
use store_api::region_engine::{RegionEngine, RegionManifestInfo, RegionRole};
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, ApiSetIndexOptions, RegionAlterRequest,
RegionOpenRequest, RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{ColumnId, RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
@@ -113,6 +115,17 @@ fn check_region_version(
assert_eq!(flushed_sequence, version_data.version.flushed_sequence);
}
fn assert_column_metadatas(column_name: &[(&str, ColumnId)], column_metadatas: &[ColumnMetadata]) {
assert_eq!(column_name.len(), column_metadatas.len());
for (name, id) in column_name {
let column_metadata = column_metadatas
.iter()
.find(|c| c.column_id == *id)
.unwrap();
assert_eq!(column_metadata.column_schema.name, *name);
}
}
#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
@@ -136,10 +149,16 @@ async fn test_alter_region() {
let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
let response = engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
assert_column_metadatas(
&[("tag_0", 0), ("field_0", 1), ("ts", 2)],
&column_metadatas,
);
let rows = Rows {
schema: column_schemas,
@@ -148,7 +167,7 @@ async fn test_alter_region() {
put_rows(&engine, region_id, rows).await;
let request = add_tag1();
engine
let response = engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
@@ -164,6 +183,18 @@ async fn test_alter_region() {
scan_check_after_alter(&engine, region_id, expected).await;
check_region_version(&engine, region_id, 1, 3, 1, 3);
let mut manifests = parse_manifest_infos_from_extensions(&response.extensions).unwrap();
assert_eq!(manifests.len(), 1);
let (return_region_id, manifest) = manifests.remove(0);
assert_eq!(return_region_id, region_id);
assert_eq!(manifest, RegionManifestInfo::mito(2, 1));
let column_metadatas =
parse_column_metadatas(&response.extensions, TABLE_COLUMN_METADATA_EXTENSION_KEY).unwrap();
assert_column_metadatas(
&[("tag_0", 0), ("field_0", 1), ("ts", 2), ("tag_1", 3)],
&column_metadatas,
);
// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine

View File

@@ -96,6 +96,14 @@ pub enum Error {
error: serde_json::Error,
},
#[snafu(display("Failed to serialize column metadata"))]
SerializeColumnMetadata {
#[snafu(source)]
error: serde_json::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid scan index, start: {}, end: {}", start, end))]
InvalidScanIndex {
start: ManifestVersion,
@@ -1063,7 +1071,8 @@ impl ErrorExt for Error {
| NoCheckpoint { .. }
| NoManifests { .. }
| InstallManifestTo { .. }
| Unexpected { .. } => StatusCode::Unexpected,
| Unexpected { .. }
| SerializeColumnMetadata { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
testing = []
enterprise = ["common-meta/enterprise", "sql/enterprise"]
enterprise = ["common-meta/enterprise", "sql/enterprise", "query/enterprise"]
[lints]
workspace = true
@@ -35,6 +35,7 @@ common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-sql.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true

View File

@@ -829,13 +829,6 @@ pub enum Error {
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Trigger related operations are not currently supported"))]
UnsupportedTrigger {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
@@ -851,6 +844,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Sql common error"))]
SqlCommon {
source: common_sql::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -902,8 +902,6 @@ impl ErrorExt for Error {
Error::NotSupported { .. }
| Error::ShowCreateTableBaseOnly { .. }
| Error::SchemaReadOnly { .. } => StatusCode::Unsupported,
#[cfg(feature = "enterprise")]
Error::UnsupportedTrigger { .. } => StatusCode::Unsupported,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::ParseSql { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } => source.status_code(),
@@ -981,6 +979,7 @@ impl ErrorExt for Error {
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
Error::SqlCommon { source, .. } => source.status_code(),
}
}

View File

@@ -22,7 +22,6 @@ use datatypes::schema::{ColumnSchema, SchemaRef};
use partition::manager::PartitionRuleManager;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
use sqlparser::ast::{ObjectName, Value as SqlValue};
use table::metadata::TableInfoRef;
@@ -227,7 +226,7 @@ fn sql_value_to_grpc_value(
column: column.clone(),
})?
} else {
statements::sql_value_to_value(
common_sql::convert::sql_value_to_value(
column,
&column_schema.data_type,
sql_val,
@@ -235,7 +234,7 @@ fn sql_value_to_grpc_value(
None,
auto_string_to_numeric,
)
.context(ParseSqlSnafu)?
.context(crate::error::SqlCommonSnafu)?
};
let grpc_value = value_to_grpc_value(value);

View File

@@ -19,6 +19,7 @@ use common_function::function_registry::FUNCTION_REGISTRY;
use common_query::prelude::TypeSignature;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_sql::convert::sql_value_to_value;
use common_telemetry::tracing;
use common_time::Timezone;
use datatypes::data_type::DataType;
@@ -30,7 +31,6 @@ use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
use sql::statements::admin::Admin;
use sql::statements::sql_value_to_value;
use crate::error::{self, Result};
use crate::statement::StatementExecutor;
@@ -186,7 +186,7 @@ fn values_to_vectors_by_exact_types(
.zip(exact_types.iter())
.map(|(value, data_type)| {
let value = sql_value_to_value(DUMMY_COLUMN, data_type, value, tz, None, false)
.context(error::ParseSqlValueSnafu)?;
.context(error::SqlCommonSnafu)?;
Ok(value_to_vector(value))
})

View File

@@ -45,6 +45,7 @@ use common_meta::rpc::ddl::{
};
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_query::Output;
use common_sql::convert::sql_value_to_value;
use common_telemetry::{debug, info, tracing, warn};
use common_time::Timezone;
use datafusion_common::tree_node::TreeNodeVisitor;
@@ -71,7 +72,6 @@ use sql::statements::create::trigger::CreateTrigger;
use sql::statements::create::{
CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions,
};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, UnaryOperator, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
@@ -87,10 +87,10 @@ use crate::error::{
ColumnNotFoundSnafu, ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu,
DeserializePartitionSnafu, EmptyDdlExprSnafu, ExternalSnafu, ExtractTableNamesSnafu,
FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, InvalidSqlSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, Result, SchemaInUseSnafu,
SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
ViewAlreadyExistsSnafu,
};
use crate::expr_helper;
use crate::statement::show::create_partitions_stmt;
@@ -1859,7 +1859,7 @@ fn convert_value(
unary_op,
false,
)
.context(ParseSqlValueSnafu)
.context(error::SqlCommonSnafu)
}
#[cfg(test)]

View File

@@ -231,10 +231,12 @@ impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub(super) async fn show_triggers(
&self,
_stmt: sql::statements::show::trigger::ShowTriggers,
_query_ctx: QueryContextRef,
stmt: sql::statements::show::trigger::ShowTriggers,
query_ctx: QueryContextRef,
) -> Result<Output> {
crate::error::UnsupportedTriggerSnafu.fail()
query::sql::show_triggers(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]

View File

@@ -7,6 +7,9 @@ license.workspace = true
[lints]
workspace = true
[features]
enterprise = []
[dependencies]
ahash.workspace = true
api.workspace = true

View File

@@ -155,7 +155,23 @@ struct PlanRewriter {
/// Partition columns of the table in current pass
partition_cols: Option<Vec<String>>,
column_requirements: HashSet<Column>,
/// Whether to expand on next call
/// This is used to handle the case where a plan is transformed, but need to be expanded from it's
/// parent node. For example a Aggregate plan is split into two parts in frontend and datanode, and need
/// to be expanded from the parent node of the Aggregate plan.
expand_on_next_call: bool,
/// Expanding on next partial/conditional/transformed commutative plan
/// This is used to handle the case where a plan is transformed, but still
/// need to push down as many node as possible before next partial/conditional/transformed commutative
/// plan. I.e.
/// ```
/// Limit:
/// Sort:
/// ```
/// where `Limit` is partial commutative, and `Sort` is conditional commutative.
/// In this case, we need to expand the `Limit` plan,
/// so that we can push down the `Sort` plan as much as possible.
expand_on_next_part_cond_trans_commutative: bool,
new_child_plan: Option<LogicalPlan>,
}
@@ -177,15 +193,38 @@ impl PlanRewriter {
{
return true;
}
if self.expand_on_next_call {
self.expand_on_next_call = false;
return true;
}
if self.expand_on_next_part_cond_trans_commutative {
let comm = Categorizer::check_plan(plan, self.partition_cols.clone());
match comm {
Commutativity::PartialCommutative => {
// a small difference is that for partial commutative, we still need to
// expand on next call(so `Limit` can be pushed down)
self.expand_on_next_part_cond_trans_commutative = false;
self.expand_on_next_call = true;
}
Commutativity::ConditionalCommutative(_)
| Commutativity::TransformedCommutative { .. } => {
// for conditional commutative and transformed commutative, we can
// expand now
self.expand_on_next_part_cond_trans_commutative = false;
return true;
}
_ => (),
}
}
match Categorizer::check_plan(plan, self.partition_cols.clone()) {
Commutativity::Commutative => {}
Commutativity::PartialCommutative => {
if let Some(plan) = partial_commutative_transformer(plan) {
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -194,6 +233,7 @@ impl PlanRewriter {
&& let Some(plan) = transformer(plan)
{
self.update_column_requirements(&plan);
self.expand_on_next_part_cond_trans_commutative = true;
self.stage.push(plan)
}
}
@@ -202,7 +242,7 @@ impl PlanRewriter {
&& let Some(transformer_actions) = transformer(plan)
{
debug!(
"PlanRewriter: transformed plan: {:#?}\n from {plan}",
"PlanRewriter: transformed plan: {:?}\n from {plan}",
transformer_actions.extra_parent_plans
);
if let Some(last_stage) = transformer_actions.extra_parent_plans.last() {
@@ -226,6 +266,10 @@ impl PlanRewriter {
}
fn update_column_requirements(&mut self, plan: &LogicalPlan) {
debug!(
"PlanRewriter: update column requirements for plan: {plan}\n withcolumn_requirements: {:?}",
self.column_requirements
);
let mut container = HashSet::new();
for expr in plan.expressions() {
// this method won't fail
@@ -235,6 +279,10 @@ impl PlanRewriter {
for col in container {
self.column_requirements.insert(col);
}
debug!(
"PlanRewriter: updated column requirements: {:?}",
self.column_requirements
);
}
fn is_expanded(&self) -> bool {

View File

@@ -181,6 +181,15 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}
// only a very limited set of plans can exist between region scan and sort exec
// other plans might make this optimize wrong, so be safe here by limiting it
if !(plan.as_any().is::<ProjectionExec>()
|| plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalesceBatchesExec>())
{
partition_ranges = None;
}
// TODO(discord9): do this in logical plan instead as it's lessy bugy there
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
@@ -194,6 +203,14 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
}
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
// `PerSeries` distribution is not supported in windowed sort.
if region_scan_exec.distribution()
== Some(store_api::storage::TimeSeriesDistribution::PerSeries)
{
partition_ranges = None;
return Ok(Transformed::no(plan));
}
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);

View File

@@ -96,9 +96,10 @@ impl PartSortExec {
if partition >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
partition,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
@@ -322,9 +323,10 @@ impl PartSortStream {
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];
@@ -355,9 +357,10 @@ impl PartSortStream {
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
internal_err!(
"Partition index out of range: {} >= {}",
"Partition index out of range: {} >= {} at {}",
self.cur_part_idx,
self.partition_ranges.len()
self.partition_ranges.len(),
snafu::location!()
)?;
}
let cur_range = self.partition_ranges[self.cur_part_idx];

View File

@@ -950,6 +950,37 @@ pub async fn show_flows(
.await
}
#[cfg(feature = "enterprise")]
pub async fn show_triggers(
stmt: sql::statements::show::trigger::ShowTriggers,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
use catalog::information_schema::TRIGGER_LIST;
const TRIGGER_NAME: &str = "trigger_name";
const TRIGGERS_COLUMN: &str = "Triggers";
let projects = vec![(TRIGGER_NAME, TRIGGERS_COLUMN)];
let like_field = Some(TRIGGER_NAME);
let sort = vec![col(TRIGGER_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
TRIGGER_LIST,
vec![],
projects,
vec![],
like_field,
sort,
stmt.kind,
)
.await
}
pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,

View File

@@ -48,6 +48,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-session.workspace = true
common-sql.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version = { workspace = true, features = ["codec"] }

View File

@@ -17,6 +17,7 @@ use std::time::Duration;
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use common_sql::convert::sql_value_to_value;
use common_time::Timestamp;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::LogicalPlan;
@@ -27,7 +28,6 @@ use itertools::Itertools;
use opensrv_mysql::{to_naive_datetime, ParamValue, ValueInner};
use snafu::ResultExt;
use sql::ast::{visit_expressions_mut, Expr, Value as ValueExpr, VisitMut};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use crate::error::{self, DataFusionSnafu, Result};

View File

@@ -19,6 +19,7 @@ common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-sql.workspace = true
common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true

View File

@@ -17,16 +17,13 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::DataFusionError;
use datafusion_sql::sqlparser::ast::UnaryOperator;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
use sqlparser::ast::Ident;
use sqlparser::parser::ParserError;
use crate::ast::{Expr, Value as SqlValue};
use crate::ast::Expr;
use crate::parsers::error::TQLError;
pub type Result<T> = std::result::Result<T, Error>;
@@ -59,18 +56,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Unsupported expr in default constraint: {:?} for column: {}",
expr,
column_name
))]
UnsupportedDefaultValue {
column_name: String,
expr: Expr,
#[snafu(implicit)]
location: Location,
},
// Syntax error from sql parser.
#[snafu(display("Invalid SQL syntax"))]
Syntax {
@@ -218,30 +203,6 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failed to cast SQL value {} to datatype {}", sql_value, datatype))]
InvalidCast {
sql_value: sqlparser::ast::Value,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Invalid unary operator {} for value {}", unary_op, value))]
InvalidUnaryOp {
unary_op: UnaryOperator,
value: Value,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported unary operator {}", unary_op))]
UnsupportedUnaryOp {
unary_op: UnaryOperator,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unrecognized table option key: {}", key))]
InvalidTableOption {
key: String,
@@ -271,25 +232,6 @@ pub enum Error {
source: api::error::Error,
},
#[snafu(display("Invalid sql value: {}", value))]
InvalidSqlValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Converting timestamp {:?} to unit {:?} overflow",
timestamp,
target_unit
))]
TimestampOverflow {
timestamp: Timestamp,
target_unit: TimeUnit,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert statement {} to DataFusion statement", statement))]
ConvertToDfStatement {
statement: String,
@@ -297,14 +239,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unable to convert sql value {} to datatype {:?}", value, datatype))]
ConvertSqlValue {
value: SqlValue,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unable to convert value {} to sql value", value))]
ConvertValue {
value: Value,
@@ -354,13 +288,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Datatype error: {}", source))]
Datatype {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Invalid partition number: {}, should be in range [2, 65536]",
partition_num
@@ -371,14 +298,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Unable to convert {} to datatype {:?}", value, datatype))]
ConvertStr {
value: String,
datatype: ConcreteDataType,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Missing `{}` clause", name))]
MissingClause {
@@ -410,6 +329,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Sql common error"))]
SqlCommon {
source: common_sql::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -417,7 +343,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
UnsupportedDefaultValue { .. } | Unsupported { .. } => StatusCode::Unsupported,
Unsupported { .. } => StatusCode::Unsupported,
Unexpected { .. }
| Syntax { .. }
| TQLSyntax { .. }
@@ -441,17 +367,11 @@ impl ErrorExt for Error {
| InvalidTableName { .. }
| InvalidFlowName { .. }
| InvalidFlowQuery { .. }
| InvalidSqlValue { .. }
| TimestampOverflow { .. }
| InvalidTableOption { .. }
| InvalidCast { .. }
| ConvertToLogicalExpression { .. }
| Simplification { .. }
| InvalidInterval { .. }
| InvalidUnaryOp { .. }
| InvalidPartitionNumber { .. }
| UnsupportedUnaryOp { .. }
| ConvertStr { .. } => StatusCode::InvalidArguments,
| InvalidPartitionNumber { .. } => StatusCode::InvalidArguments,
#[cfg(feature = "enterprise")]
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
@@ -463,9 +383,9 @@ impl ErrorExt for Error {
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
Datatype { source, .. } => source.status_code(),
SqlCommon { source, .. } => source.status_code(),
ConvertToDfStatement { .. } => StatusCode::Internal,
ConvertSqlValue { .. } | ConvertValue { .. } => StatusCode::Unsupported,
ConvertValue { .. } => StatusCode::Unsupported,
PermissionDenied { .. } => StatusCode::PermissionDenied,
SetFulltextOption { .. } | SetSkippingIndexOption { .. } => StatusCode::Unexpected,

File diff suppressed because it is too large Load Diff

View File

@@ -233,3 +233,65 @@ DROP TABLE lightning;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
Affected Rows: 0
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
Affected Rows: 3
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+---------------------+------------------------------------------+
| greptime_timestamp | sum(instance_job_metrics.greptime_value) |
+---------------------+------------------------------------------+
| 2023-10-01T00:00:01 | 1696118400.0 |
| 2023-10-01T00:00:02 | 3392236800.0 |
| 2023-10-01T00:00:03 | 5088355200.0 |
| 2023-10-01T00:00:04 | 5088355200.0 |
| 2023-10-01T00:00:05 | 5088355200.0 |
+---------------------+------------------------------------------+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(instance_job_REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_PromInstantManipulateExec: range=[1696118400000..1696118405000], lookback=[300000], interval=[1000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["instance", "job"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":1, "mem_ranges":1, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
DROP TABLE IF EXISTS `instance_job_metrics`;
Affected Rows: 0

View File

@@ -120,3 +120,30 @@ ORDER BY
true_collect_time DESC;
DROP TABLE lightning;
CREATE TABLE IF NOT EXISTS `instance_job_metrics` (
`greptime_timestamp` TIMESTAMP(3) NOT NULL,
`greptime_value` DOUBLE NULL,
`instance` STRING NULL,
`job` STRING NULL,
TIME INDEX (`greptime_timestamp`),
PRIMARY KEY (`instance`, `job`)
);
INSERT INTO `instance_job_metrics` VALUES
('2023-10-01 00:00:01.000', 1696118400.0, 'node1', 'job1'),
('2023-10-01 00:00:02.000', 1696118400.0, 'node2', 'job1'),
('2023-10-01 00:00:03.000', 1696118400.0, 'node3', 'job2');
TQL EVAL('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
TQL ANALYZE('2023-10-01 00:00:00.000'::TIMESTAMP, '2023-10-01 00:00:05.000'::TIMESTAMP, '1s') sum(instance_job_metrics);
DROP TABLE IF EXISTS `instance_job_metrics`;

View File

@@ -234,3 +234,57 @@ drop table test;
Affected Rows: 0
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
Affected Rows: 0
TQL EVAL sum(test2);
++
++
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_SortPreservingMergeExec: [greptime_timestamp@0 ASC NULLS LAST] REDACTED
|_|_|_SortExec: expr=[greptime_timestamp@0 ASC NULLS LAST], preserve_partitioning=[true] REDACTED
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_RepartitionExec: partitioning=Hash([greptime_timestamp@0], 20), input_partitions=20 REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[greptime_timestamp@0 as greptime_timestamp], aggr=[sum(test2.greptime_value)] REDACTED
|_|_|_ProjectionExec: expr=[greptime_timestamp@0 as greptime_timestamp, greptime_value@1 as greptime_value] REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
| 1_| 1_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[greptime_timestamp] REDACTED
|_|_|_PromSeriesDivideExec: tags=["shard"] REDACTED
|_|_|_SeriesScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0}, "distribution":"PerSeries" REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE test2;
Affected Rows: 0

View File

@@ -95,3 +95,27 @@ TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
TQL ANALYZE FORMAT TEXT (0, 10, '5s') test;
drop table test;
CREATE TABLE test2 (
"greptime_timestamp" TIMESTAMP(3) NOT NULL,
"greptime_value" DOUBLE NULL,
"shard" STRING NULL INVERTED INDEX,
TIME INDEX ("greptime_timestamp"),
PRIMARY KEY ("shard")
)
PARTITION ON COLUMNS ("shard") (
shard <= '2',
shard > '2'
);
TQL EVAL sum(test2);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
TQL ANALYZE sum(test2);
DROP TABLE test2;