From 16c1ee26180316ec1337cb61ef174ebd171a79aa Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 20 Jun 2023 18:26:55 +0800 Subject: [PATCH 01/13] feat: incremental database backup (#1240) * feat: incremental database backup * chore: rebase develop * chore: move backup to StatementExecutor * feat: copy database parser * chore: remove some todos * chore: use timestamp string instead of i64 string * fix: typo --- Cargo.lock | 1 + src/common/datasource/src/file_format.rs | 10 ++ src/common/query/src/logical_plan.rs | 2 +- src/common/query/src/logical_plan/expr.rs | 53 +++++++++ src/common/time/src/range.rs | 25 +++- src/datanode/src/instance/sql.rs | 16 +++ src/frontend/Cargo.toml | 1 + src/frontend/src/error.rs | 8 ++ src/frontend/src/instance.rs | 5 +- src/frontend/src/statement.rs | 71 ++++++++++- src/frontend/src/statement/backup.rs | 97 +++++++++++++++ src/frontend/src/statement/copy_table_from.rs | 5 +- src/frontend/src/statement/copy_table_to.rs | 30 +++-- src/sql/src/lib.rs | 1 + src/sql/src/parsers/copy_parser.rs | 110 +++++++++++++++--- src/sql/src/statements/copy.rs | 15 +++ src/sql/src/statements/statement.rs | 3 +- src/table/src/requests.rs | 12 ++ 18 files changed, 427 insertions(+), 38 deletions(-) create mode 100644 src/frontend/src/statement/backup.rs diff --git a/Cargo.lock b/Cargo.lock index 67ec748de3..24f9769e5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3222,6 +3222,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-time", "datafusion", "datafusion-common", "datafusion-expr", diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index bcb777a9dd..81c7443d8d 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -58,6 +58,16 @@ pub enum Format { Parquet(ParquetFormat), } +impl Format { + pub fn suffix(&self) -> &'static str { + match self { + Format::Csv(_) => ".csv", + Format::Json(_) => ".json", + Format::Parquet(_) => ".parquet", + } + } +} + impl TryFrom<&HashMap> for Format { type Error = error::Error; diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 809d033baf..0b8d67ae6a 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -20,6 +20,7 @@ mod udf; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; +pub use expr::build_filter_from_timestamp; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; pub use self::expr::{DfExpr, Expr}; @@ -28,7 +29,6 @@ pub use self::udf::ScalarUdf; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; use crate::logical_plan::accumulator::*; use crate::signature::{Signature, Volatility}; - /// Creates a new UDF with a specific signature and specific return type. /// This is a helper function to create a new UDF. /// The function `create_udf` returns a subset of all possible `ScalarFunction`: diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 0af6aa3922..5744b4f486 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datafusion_common::{Column, ScalarValue}; pub use datafusion_expr::expr::Expr as DfExpr; +use datafusion_expr::{and, binary_expr, Operator}; /// Central struct of query API. /// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. @@ -33,6 +38,54 @@ impl From for Expr { } } +/// Builds an `Expr` that filters timestamp column from given timestamp range. +/// Returns [None] if time range is [None] or full time range. +pub fn build_filter_from_timestamp( + ts_col_name: &str, + time_range: Option<&TimestampRange>, +) -> Option { + let Some(time_range) = time_range else { return None; }; + let ts_col_expr = DfExpr::Column(Column { + relation: None, + name: ts_col_name.to_string(), + }); + + let df_expr = match (time_range.start(), time_range.end()) { + (None, None) => None, + (Some(start), None) => Some(binary_expr( + ts_col_expr, + Operator::GtEq, + timestamp_to_literal(start), + )), + (None, Some(end)) => Some(binary_expr( + ts_col_expr, + Operator::Lt, + timestamp_to_literal(end), + )), + (Some(start), Some(end)) => Some(and( + binary_expr( + ts_col_expr.clone(), + Operator::GtEq, + timestamp_to_literal(start), + ), + binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)), + )), + }; + + df_expr.map(Expr::from) +} + +/// Converts a [Timestamp] to datafusion literal value. +fn timestamp_to_literal(timestamp: &Timestamp) -> DfExpr { + let scalar_value = match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), + }; + DfExpr::Literal(scalar_value) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index ce61fea10e..0461378449 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -14,6 +14,8 @@ use std::fmt::{Debug, Display, Formatter}; +use serde::{Deserialize, Serialize}; + use crate::timestamp::TimeUnit; use crate::timestamp_millis::TimestampMillis; use crate::Timestamp; @@ -23,7 +25,7 @@ use crate::Timestamp; /// The range contains values that `value >= start` and `val < end`. /// /// The range is empty iff `start == end == "the default value of T"` -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct GenericRange { start: Option, end: Option, @@ -522,4 +524,25 @@ mod tests { ); assert!(range.is_empty()); } + + #[test] + fn test_serialize_timestamp_range() { + macro_rules! test_serde_for_unit { + ($($unit: expr),*) => { + $( + let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap(); + let string = serde_json::to_string(&original_range).unwrap(); + let deserialized: TimestampRange = serde_json::from_str(&string).unwrap(); + assert_eq!(original_range, deserialized); + )* + }; + } + + test_serde_for_unit!( + TimeUnit::Second, + TimeUnit::Millisecond, + TimeUnit::Microsecond, + TimeUnit::Nanosecond + ); + } } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index f0eda492b4..3b7b5f88be 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -228,6 +228,22 @@ pub fn table_idents_to_full_name( } } +pub fn idents_to_full_database_name( + obj_name: &ObjectName, + query_ctx: &QueryContextRef, +) -> Result<(String, String)> { + match &obj_name.0[..] { + [database] => Ok((query_ctx.current_catalog(), database.value.clone())), + [catalog, database] => Ok((catalog.value.clone(), database.value.clone())), + _ => error::InvalidSqlSnafu { + msg: format!( + "expect database name to be ., , found: {obj_name}", + ), + } + .fail(), + } +} + #[async_trait] impl SqlStatementExecutor for Instance { async fn execute_sql( diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 4487b10e37..207be3e093 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -29,6 +29,7 @@ common-meta = { path = "../common/meta" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 7aba14e118..bdfa8d697d 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -548,6 +548,13 @@ pub enum Error { #[snafu(backtrace)] source: query::error::Error, }, + + #[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))] + InvalidCopyParameter { + key: String, + value: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -667,6 +674,7 @@ impl ErrorExt for Error { | Error::BuildBackend { source } => source.status_code(), Error::WriteParquet { source, .. } => source.status_code(), + Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3507b7e36e..0f116c50d2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -627,12 +627,15 @@ pub fn check_permission( Statement::DescribeTable(stmt) => { validate_param(stmt.name(), query_ctx)?; } - Statement::Copy(stmd) => match stmd { + Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt { CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?, CopyTable::From(copy_table_from) => { validate_param(©_table_from.table_name, query_ctx)? } }, + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => { + validate_param(&stmt.database_name, query_ctx)? + } } Ok(()) } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index d3cd1d82aa..4030452fc9 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -12,32 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod backup; mod copy_table_from; mod copy_table_to; mod describe; mod show; mod tql; +use std::collections::HashMap; +use std::str::FromStr; + use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; -use datanode::instance::sql::table_idents_to_full_name; +use common_time::range::TimestampRange; +use common_time::Timestamp; +use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name}; use query::parser::QueryStatement; use query::query_engine::SqlStatementExecutorRef; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::copy::{CopyTable, CopyTableArgument}; +use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; -use table::requests::{CopyDirection, CopyTableRequest}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use table::TableRef; +use crate::error; use crate::error::{ CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, }; +use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] pub struct StatementExecutor { @@ -92,14 +100,23 @@ impl StatementExecutor { Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await, - Statement::Copy(stmt) => { + Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => { let req = to_copy_table_request(stmt, query_ctx)?; match req.direction { - CopyDirection::Export => self.copy_table_to(req).await, - CopyDirection::Import => self.copy_table_from(req).await, + CopyDirection::Export => { + self.copy_table_to(req).await.map(Output::AffectedRows) + } + CopyDirection::Import => { + self.copy_table_from(req).await.map(Output::AffectedRows) + } } } + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => { + self.copy_database(to_copy_database_request(arg, &query_ctx)?) + .await + } + Statement::CreateDatabase(_) | Statement::CreateTable(_) | Statement::CreateExternalTable(_) @@ -191,5 +208,47 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< connection, pattern, direction, + // we copy the whole table by default. + timestamp_range: None, }) } + +/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest]. +/// This function extracts the necessary info including catalog/database name, time range, etc. +fn to_copy_database_request( + arg: CopyDatabaseArgument, + query_ctx: &QueryContextRef, +) -> Result { + let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?; + let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?; + + let time_range = match (start_timestamp, end_timestamp) { + (Some(start), Some(end)) => TimestampRange::new(start, end), + (Some(start), None) => Some(TimestampRange::from_start(start)), + (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end + (None, None) => None, + }; + + Ok(CopyDatabaseRequest { + catalog_name, + schema_name: database_name, + location: arg.location, + with: arg.with, + connection: arg.connection, + time_range, + }) +} + +/// Extracts timestamp from a [HashMap] with given key. +fn extract_timestamp(map: &HashMap, key: &str) -> Result> { + map.get(key) + .map(|v| { + Timestamp::from_str(v) + .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build()) + }) + .transpose() +} diff --git a/src/frontend/src/statement/backup.rs b/src/frontend/src/statement/backup.rs new file mode 100644 index 0000000000..bdd99f73c3 --- /dev/null +++ b/src/frontend/src/statement/backup.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datasource::file_format::Format; +use common_query::Output; +use common_telemetry::info; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; + +use crate::error; +use crate::error::{ + CatalogNotFoundSnafu, CatalogSnafu, InvalidCopyParameterSnafu, SchemaNotFoundSnafu, +}; +use crate::statement::StatementExecutor; + +pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; +pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; + +impl StatementExecutor { + pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result { + // location must end with / so that every table is exported to a file. + ensure!( + req.location.ends_with('/'), + InvalidCopyParameterSnafu { + key: "location", + value: req.location, + } + ); + + info!( + "Copy database {}.{}, dir: {},. time: {:?}", + req.catalog_name, req.schema_name, req.location, req.time_range + ); + let schema = self + .catalog_manager + .catalog(&req.catalog_name) + .await + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { + catalog_name: &req.catalog_name, + })? + .schema(&req.schema_name) + .await + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { + schema_info: &req.schema_name, + })?; + + let suffix = Format::try_from(&req.with) + .context(error::ParseFileFormatSnafu)? + .suffix(); + + let table_names = schema.table_names().await.context(CatalogSnafu)?; + + let mut exported_rows = 0; + for table_name in table_names { + // TODO(hl): remove this hardcode once we've removed numbers table. + if table_name == "numbers" { + continue; + } + let mut table_file = req.location.clone(); + table_file.push_str(&table_name); + table_file.push_str(suffix); + info!( + "Copy table: {}.{}.{} to {}", + req.catalog_name, req.schema_name, table_name, table_file + ); + + let exported = self + .copy_table_to(CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: table_file, + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Export, + timestamp_range: req.time_range, + }) + .await?; + exported_rows += exported; + } + Ok(Output::AffectedRows(exported_rows)) + } +} diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 8f590b5109..617195a5e9 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -24,7 +24,6 @@ use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; -use common_query::Output; use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter; use common_recordbatch::DfSendableRecordBatchStream; use datafusion::datasource::listing::PartitionedFile; @@ -205,7 +204,7 @@ impl StatementExecutor { } } - pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result { + pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -313,7 +312,7 @@ impl StatementExecutor { } } - Ok(Output::AffectedRows(rows_inserted)) + Ok(rows_inserted) } } diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/frontend/src/statement/copy_table_to.rs index aed13b57e4..77fba4a499 100644 --- a/src/frontend/src/statement/copy_table_to.rs +++ b/src/frontend/src/statement/copy_table_to.rs @@ -18,7 +18,6 @@ use common_datasource::file_format::json::stream_to_json; use common_datasource::file_format::Format; use common_datasource::object_store::{build_backend, parse_url}; use common_query::physical_plan::SessionContext; -use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; use object_store::ObjectStore; @@ -72,7 +71,7 @@ impl StatementExecutor { } } - pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { + pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -82,12 +81,25 @@ impl StatementExecutor { let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; - let stream = table - .scan(None, &[], None) - .await - .with_context(|_| error::CopyTableSnafu { - table_name: table_ref.to_string(), - })?; + let filters = table + .schema() + .timestamp_column() + .and_then(|c| { + common_query::logical_plan::build_filter_from_timestamp( + &c.name, + req.timestamp_range.as_ref(), + ) + }) + .into_iter() + .collect::>(); + + let stream = + table + .scan(None, &filters, None) + .await + .with_context(|_| error::CopyTableSnafu { + table_name: table_ref.to_string(), + })?; let stream = stream .execute(0, SessionContext::default().task_ctx()) @@ -101,6 +113,6 @@ impl StatementExecutor { .stream_to_file(stream, &format, object_store, &path) .await?; - Ok(Output::AffectedRows(rows_copied)) + Ok(rows_copied) } } diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 027ecf2817..ef9ef453fc 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(box_patterns)] #![feature(assert_matches)] +#![feature(let_chains)] pub mod ast; pub mod dialect; diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 0e56f90757..1bad3a3f7f 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -12,22 +12,60 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use snafu::ResultExt; use sqlparser::ast::ObjectName; use sqlparser::keywords::Keyword; +use sqlparser::tokenizer::Token::Word; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::copy::{CopyTable, CopyTableArgument}; +use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use crate::statements::statement::Statement; use crate::util::parse_option_string; +pub type With = HashMap; +pub type Connection = HashMap; + // COPY tbl TO 'output.parquet'; impl<'a> ParserContext<'a> { pub(crate) fn parse_copy(&mut self) -> Result { self.parser.next_token(); - let copy_table = self.parse_copy_table()?; - Ok(Statement::Copy(copy_table)) + let next = self.parser.peek_token(); + let copy = if let Word(word) = next.token && word.keyword == Keyword::DATABASE { + self.parser.next_token(); + let copy_database = self.parser_copy_database()?; + crate::statements::copy::Copy::CopyDatabase(copy_database) + } else { + let copy_table = self.parse_copy_table()?; + crate::statements::copy::Copy::CopyTable(copy_table) + }; + + Ok(Statement::Copy(copy)) + } + + fn parser_copy_database(&mut self) -> Result { + let database_name = + self.parser + .parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "a database name", + actual: self.peek_token_as_string(), + })?; + + self.parser + .expect_keyword(Keyword::TO) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let (with, connection, location) = self.parse_copy_to()?; + Ok(CopyDatabaseArgument { + database_name, + with, + connection, + location, + }) } fn parse_copy_table(&mut self) -> Result { @@ -41,7 +79,13 @@ impl<'a> ParserContext<'a> { })?; if self.parser.parse_keyword(Keyword::TO) { - Ok(CopyTable::To(self.parse_copy_table_to(table_name)?)) + let (with, connection, location) = self.parse_copy_to()?; + Ok(CopyTable::To(CopyTableArgument { + table_name, + with, + connection, + location, + })) } else { self.parser .expect_keyword(Keyword::FROM) @@ -91,7 +135,7 @@ impl<'a> ParserContext<'a> { }) } - fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result { + fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> { let location = self.parser .parse_literal_string() @@ -125,12 +169,7 @@ impl<'a> ParserContext<'a> { }) .collect(); - Ok(CopyTableArgument { - table_name, - with, - connection, - location, - }) + Ok((with, connection, location)) } } @@ -139,8 +178,11 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; + use sqlparser::ast::Ident; + use super::*; use crate::dialect::GreptimeDbDialect; + use crate::statements::statement::Statement::Copy; #[test] fn test_parse_copy_table() { @@ -155,7 +197,8 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::To(copy_table)) => { + Copy(copy) => { + let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy else { unreachable!() }; let (catalog, schema, table) = if let [catalog, schema, table] = ©_table.table_name.0[..] { ( @@ -198,7 +241,9 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::From(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From( + copy_table, + ))) => { let (catalog, schema, table) = if let [catalog, schema, table] = ©_table.table_name.0[..] { ( @@ -254,7 +299,9 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::From(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From( + copy_table, + ))) => { if let Some(expected_pattern) = test.expected_pattern { assert_eq!(copy_table.pattern().unwrap(), expected_pattern); } @@ -295,11 +342,44 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::To(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::To( + copy_table, + ))) => { assert_eq!(copy_table.connection.clone(), test.expected_connection); } _ => unreachable!(), } } } + + #[test] + fn test_copy_database_to() { + let sql = "COPY DATABASE catalog0.schema0 TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')"; + let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}) + .unwrap() + .pop() + .unwrap(); + + let Statement::Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() }; + assert_eq!( + ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), + stmt.database_name + ); + assert_eq!( + [("format".to_string(), "parquet".to_string())] + .into_iter() + .collect::>(), + stmt.with + ); + + assert_eq!( + [ + ("foo".to_string(), "Bar".to_string()), + ("one".to_string(), "two".to_string()) + ] + .into_iter() + .collect::>(), + stmt.connection + ); + } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index 35dfe1bfe1..494c07c707 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -15,12 +15,27 @@ use std::collections::HashMap; use sqlparser::ast::ObjectName; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Copy { + CopyTable(CopyTable), + CopyDatabase(CopyDatabaseArgument), +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum CopyTable { To(CopyTableArgument), From(CopyTableArgument), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CopyDatabaseArgument { + pub database_name: ObjectName, + pub with: HashMap, + pub connection: HashMap, + pub location: String, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct CopyTableArgument { pub table_name: ObjectName, diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 37e1363265..20a608ae5d 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -17,7 +17,6 @@ use sqlparser::ast::Statement as SpStatement; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::alter::AlterTable; -use crate::statements::copy::CopyTable; use crate::statements::create::{CreateDatabase, CreateExternalTable, CreateTable}; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; @@ -60,7 +59,7 @@ pub enum Statement { Explain(Explain), Use(String), // COPY - Copy(CopyTable), + Copy(crate::statements::copy::Copy), Tql(Tql), } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c71cb7382b..57fbcb3a1c 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -20,6 +20,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_query::AddColumnLocation; +use common_time::range::TimestampRange; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, RawSchema}; use serde::{Deserialize, Serialize}; @@ -268,6 +269,7 @@ pub struct CopyTableRequest { pub connection: HashMap, pub pattern: Option, pub direction: CopyDirection, + pub timestamp_range: Option, } #[derive(Debug, Clone, Default)] @@ -293,6 +295,16 @@ macro_rules! meter_insert_request { }; } +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +pub struct CopyDatabaseRequest { + pub catalog_name: String, + pub schema_name: String, + pub location: String, + pub with: HashMap, + pub connection: HashMap, + pub time_range: Option, +} + #[cfg(test)] mod tests { use super::*; From e47ef1f0d2666a0d36c8db3b56b81a082dbe6dcc Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Tue, 20 Jun 2023 19:03:52 +0800 Subject: [PATCH 02/13] chore: minor fix (#1801) --- src/meta-client/src/client.rs | 4 +- src/meta-srv/src/service/store/etcd.rs | 51 +++++++++----------------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 78a612f4a2..5842b431c4 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -756,7 +756,7 @@ mod tests { let tc = new_client("test_batch_put").await; let mut req = BatchPutRequest::new(); - for i in 0..256 { + for i in 0..275 { req = req.add_kv( tc.key(&format!("key-{}", i)), format!("value-{}", i).into_bytes(), @@ -769,7 +769,7 @@ mod tests { let req = RangeRequest::new().with_prefix(tc.key("key-")); let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); - assert_eq!(256, kvs.len()); + assert_eq!(275, kvs.len()); } #[tokio::test] diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 22834b355b..c97543137c 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -32,6 +32,10 @@ use crate::error::Result; use crate::metrics::METRIC_META_KV_REQUEST; use crate::service::store::kv::{KvStore, KvStoreRef}; +// Maximum number of operations permitted in a transaction. +// The etcd default configuration's `--max-txn-ops` is 128. +// +// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ const MAX_TXN_SIZE: usize = 128; pub struct EtcdStore { @@ -55,7 +59,7 @@ impl EtcdStore { Ok(Arc::new(Self { client })) } - async fn do_multi_txn(&self, mut txn_ops: Vec) -> Result> { + async fn do_multi_txn(&self, txn_ops: Vec) -> Result> { if txn_ops.len() < MAX_TXN_SIZE { // fast path let txn = Txn::new().and_then(txn_ops); @@ -68,36 +72,17 @@ impl EtcdStore { return Ok(vec![txn_res]); } - let mut txns = vec![]; - loop { - if txn_ops.is_empty() { - break; - } + let txns = txn_ops + .chunks(MAX_TXN_SIZE) + .map(|part| async move { + let txn = Txn::new().and_then(part); + self.client.kv_client().txn(txn).await + }) + .collect::>(); - if txn_ops.len() < MAX_TXN_SIZE { - let txn = Txn::new().and_then(txn_ops); - txns.push(txn); - break; - } - - let part = txn_ops.drain(..MAX_TXN_SIZE).collect::>(); - let txn = Txn::new().and_then(part); - txns.push(txn); - } - - let mut txn_responses = Vec::with_capacity(txns.len()); - // Considering the pressure on etcd, it would be more appropriate to execute txn in - // a serial manner. - for txn in txns { - let txn_res = self - .client - .kv_client() - .txn(txn) - .await - .context(error::EtcdFailedSnafu)?; - txn_responses.push(txn_res); - } - Ok(txn_responses) + futures::future::try_join_all(txns) + .await + .context(error::EtcdFailedSnafu) } } @@ -241,7 +226,7 @@ impl KvStore for EtcdStore { prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); } } - _ => unreachable!(), // never get here + _ => unreachable!(), } } } @@ -283,7 +268,7 @@ impl KvStore for EtcdStore { prev_kvs.push(KvPair::from_etcd_kv(kv)); }); } - _ => unreachable!(), // never get here + _ => unreachable!(), } } } @@ -343,7 +328,7 @@ impl KvStore for EtcdStore { let prev_kv = match op_res { TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv), TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv), - _ => unreachable!(), // never get here + _ => unreachable!(), }; let header = Some(ResponseHeader::success(cluster_id)); From 6205616301e8d73efcf010e1f87a2ae33126e0e9 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 20 Jun 2023 19:17:35 +0800 Subject: [PATCH 03/13] fix: filter table regional values with the current node id (#1800) --- src/catalog/src/remote/manager.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index dc8b521793..eea2ddf67a 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -954,15 +954,26 @@ impl SchemaProvider for RemoteSchemaProvider { async fn table_names(&self) -> Result> { let key_prefix = build_table_regional_prefix(&self.catalog_name, &self.schema_name); let iter = self.backend.range(key_prefix.as_bytes()); - let table_names = iter + let regional_keys = iter .map(|kv| { let Kv(key, _) = kv?; let regional_key = TableRegionalKey::parse(String::from_utf8_lossy(&key)) .context(InvalidCatalogValueSnafu)?; - Ok(regional_key.table_name) + Ok(regional_key) }) - .try_collect() + .try_collect::>() .await?; + + let table_names = regional_keys + .into_iter() + .filter_map(|x| { + if x.node_id == self.node_id { + Some(x.table_name) + } else { + None + } + }) + .collect(); Ok(table_names) } From 3b91fc2c64575e2708b64e2936fe75513dd07078 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 21 Jun 2023 03:50:08 +0100 Subject: [PATCH 04/13] feat: add initial implementation for status endpoint (#1789) * feat: add initial implementation for status endpoint * feat(status_endpoint): add more data to response * feat(status_endpoint): use build data env vars * feat(status_endpoint): add simple test * fix(status_endpoint): adjust the toml indentation --- src/cmd/Cargo.toml | 2 +- src/servers/Cargo.toml | 3 +++ src/servers/build.rs | 6 ++++++ src/servers/src/http.rs | 2 ++ src/servers/src/http/handler.rs | 24 +++++++++++++++++++++ src/servers/tests/http/http_handler_test.rs | 15 +++++++++++++ 6 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 5bcd4392d2..c326ab7b05 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -52,4 +52,4 @@ serde.workspace = true toml = "0.5" [build-dependencies] -build-data = "0.1.3" +build-data = "0.1.4" diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 8621aa0522..d0f1523beb 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -103,3 +103,6 @@ table = { path = "../table" } tokio-postgres = "0.7" tokio-postgres-rustls = "0.10" tokio-test = "0.4" + +[build-dependencies] +build-data = "0.1.4" diff --git a/src/servers/build.rs b/src/servers/build.rs index e1e0ea9bb6..3803db9e29 100644 --- a/src/servers/build.rs +++ b/src/servers/build.rs @@ -13,6 +13,12 @@ // limitations under the License. fn main() { + build_data::set_RUSTC_VERSION(); + build_data::set_BUILD_HOSTNAME(); + build_data::set_GIT_BRANCH(); + build_data::set_GIT_COMMIT(); + build_data::set_SOURCE_TIMESTAMP(); + #[cfg(feature = "dashboard")] fetch_dashboard_assets(); } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0b28ae0623..c1b809e5e5 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -512,6 +512,8 @@ impl HttpServer { routing::get(handler::health).post(handler::health), ); + router = router.route("/status", routing::get(handler::status)); + #[cfg(feature = "dashboard")] { if !self.options.disable_dashboard { diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 04ae53aa18..d91a7d4b58 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::env; use std::time::Instant; use aide::transform::TransformOperation; @@ -158,3 +159,26 @@ pub struct HealthResponse {} pub async fn health(Query(_params): Query) -> Json { Json(HealthResponse {}) } + +#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] +pub struct StatusResponse<'a> { + pub source_time: &'a str, + pub commit: &'a str, + pub branch: &'a str, + pub rustc_version: &'a str, + pub hostname: &'a str, + pub version: &'a str, +} + +/// Handler to expose information info about runtime, build, etc. +#[axum_macros::debug_handler] +pub async fn status() -> Json> { + Json(StatusResponse { + source_time: env!("SOURCE_TIMESTAMP"), + commit: env!("GIT_COMMIT"), + branch: env!("GIT_BRANCH"), + rustc_version: env!("RUSTC_VERSION"), + hostname: env!("BUILD_HOSTNAME"), + version: env!("CARGO_PKG_VERSION"), + }) +} diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 4d3698c150..404df0ff75 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -365,3 +365,18 @@ async fn test_health() { expected_json_str ); } + +#[tokio::test] +async fn test_status() { + let expected_json = http_handler::StatusResponse { + source_time: env!("SOURCE_TIMESTAMP"), + commit: env!("GIT_COMMIT"), + branch: env!("GIT_BRANCH"), + rustc_version: env!("RUSTC_VERSION"), + hostname: env!("BUILD_HOSTNAME"), + version: env!("CARGO_PKG_VERSION"), + }; + + let Json(json) = http_handler::status().await; + assert_eq!(json, expected_json); +} From 23bf55a2659ee805b107e1d45f9f11645d560c80 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 21 Jun 2023 10:59:58 +0800 Subject: [PATCH 05/13] fix: __field__ matcher on single value column (#1805) * fix error text and field_column_names Signed-off-by: Ruihang Xia * add sqlness test Signed-off-by: Ruihang Xia * add empty line Signed-off-by: Ruihang Xia * improve style Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/planner.rs | 6 +-- src/table/src/metadata.rs | 17 ++++----- .../tql-explain-analyze/explain.result | 38 +++++++++++++++++++ .../tql-explain-analyze/explain.sql | 15 ++++++++ 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index c282d25f3c..51091e89f9 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -541,7 +541,7 @@ impl PromPlanner { result_set.insert(matcher.value.clone()); } else { return Err(ColumnNotFoundSnafu { - col: self.ctx.table_name.clone().unwrap(), + col: matcher.value.clone(), } .build()); } @@ -550,8 +550,8 @@ impl PromPlanner { if col_set.contains(&matcher.value) { reverse_set.insert(matcher.value.clone()); } else { - return Err(ValueNotFoundSnafu { - table: self.ctx.table_name.clone().unwrap(), + return Err(ColumnNotFoundSnafu { + col: matcher.value.clone(), } .build()); } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 8e5b183f2a..e857ddf498 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -162,15 +162,14 @@ impl TableMeta { } pub fn field_column_names(&self) -> impl Iterator { - let columns_schemas = &self.schema.column_schemas(); - self.value_indices.iter().filter_map(|idx| { - let column = &columns_schemas[*idx]; - if column.is_time_index() { - None - } else { - Some(&column.name) - } - }) + // `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE + let columns_schemas = self.schema.column_schemas(); + let primary_key_indices = &self.primary_key_indices; + columns_schemas + .iter() + .enumerate() + .filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index()) + .map(|(_, cs)| &cs.name) } /// Returns the new [TableMetaBuilder] after applying given `alter_kind`. diff --git a/tests/cases/distributed/tql-explain-analyze/explain.result b/tests/cases/distributed/tql-explain-analyze/explain.result index 7fb8a4384f..1dc38f1f2f 100644 --- a/tests/cases/distributed/tql-explain-analyze/explain.result +++ b/tests/cases/distributed/tql-explain-analyze/explain.result @@ -31,3 +31,41 @@ DROP TABLE test; Affected Rows: 1 +CREATE TABLE host_load1 ( + ts TIMESTAMP(3) NOT NULL, + collector STRING NULL, + host STRING NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (collector, host) +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +TQL EXPLAIN host_load1{__field__="val"}; + ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | +| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] | +| | PromSeriesDivide: tags=["collector", "host"] | +| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST | +| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts | +| | MergeScan [is_placeholder=false] | +| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] | +| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["collector", "host"] | +| | RepartitionExec: partitioning=REDACTED +| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] | +| | MergeScanExec: peers=[REDACTED +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + +DROP TABLE host_load1; + +Affected Rows: 1 + diff --git a/tests/cases/distributed/tql-explain-analyze/explain.sql b/tests/cases/distributed/tql-explain-analyze/explain.sql index 58f7bcf1a6..d0f237d0aa 100644 --- a/tests/cases/distributed/tql-explain-analyze/explain.sql +++ b/tests/cases/distributed/tql-explain-analyze/explain.sql @@ -9,3 +9,18 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); TQL EXPLAIN (0, 10, '5s') test; DROP TABLE test; + +CREATE TABLE host_load1 ( + ts TIMESTAMP(3) NOT NULL, + collector STRING NULL, + host STRING NULL, + val DOUBLE NULL, + TIME INDEX (ts), + PRIMARY KEY (collector, host) +); + +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peer-.*) REDACTED +TQL EXPLAIN host_load1{__field__="val"}; + +DROP TABLE host_load1; From 5335203360886d469b793ee356cb8f591024b3f6 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 21 Jun 2023 14:08:45 +0800 Subject: [PATCH 06/13] feat: support cross compilation to aarch64 linux (#1802) --- Cross.toml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 Cross.toml diff --git a/Cross.toml b/Cross.toml new file mode 100644 index 0000000000..2eb1c0d2a7 --- /dev/null +++ b/Cross.toml @@ -0,0 +1,7 @@ +[build] +pre-build = [ + "dpkg --add-architecture $CROSS_DEB_ARCH", + "apt update && apt install -y unzip zlib1g-dev:$CROSS_DEB_ARCH", + "curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip && unzip protoc-3.15.8-linux-x86_64.zip -d /usr/", + "chmod a+x /usr/bin/protoc && chmod -R a+rx /usr/include/google", +] From fa522bc57979b581ac84f2dfe9add6de22f2291a Mon Sep 17 00:00:00 2001 From: LFC Date: Wed, 21 Jun 2023 14:49:32 +0800 Subject: [PATCH 07/13] fix: drop region alive countdown tasks when deregistering table (#1808) --- Cargo.lock | 1 + src/catalog/src/remote/region_alive_keeper.rs | 60 ++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 24f9769e5d..d9e8c1ca52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8520,6 +8520,7 @@ dependencies = [ "axum-macros", "axum-test-helper", "base64 0.13.1", + "build-data", "bytes", "catalog", "chrono", diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 61daee4cf1..9b64e35559 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -104,10 +104,14 @@ impl RegionAliveKeepers { Ok(()) } - pub async fn deregister_table(&self, table_ident: &TableIdent) { - if self.keepers.lock().await.remove(table_ident).is_some() { + pub async fn deregister_table( + &self, + table_ident: &TableIdent, + ) -> Option> { + self.keepers.lock().await.remove(table_ident).map(|x| { info!("Deregister RegionAliveKeeper for table {table_ident}"); - } + x + }) } pub async fn register_region(&self, region_ident: &RegionIdent) { @@ -127,7 +131,7 @@ impl RegionAliveKeepers { warn!("Alive keeper for region {region_ident} is not found!"); return; }; - keeper.deregister_region(region_ident.region_number).await + let _ = keeper.deregister_region(region_ident.region_number).await; } pub async fn start(&self) { @@ -230,9 +234,11 @@ impl RegionAliveKeeper { return; } - let countdown_task_handles = self.countdown_task_handles.clone(); + let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles); let on_task_finished = async move { - let _ = countdown_task_handles.lock().await.remove(®ion); + if let Some(x) = countdown_task_handles.upgrade() { + x.lock().await.remove(®ion); + } // Else the countdown task handles map could be dropped because the keeper is dropped. }; let handle = Arc::new(CountdownTaskHandle::new( self.table_engine.clone(), @@ -259,19 +265,18 @@ impl RegionAliveKeeper { } } - async fn deregister_region(&self, region: RegionNumber) { - if self - .countdown_task_handles + async fn deregister_region(&self, region: RegionNumber) -> Option> { + self.countdown_task_handles .lock() .await .remove(®ion) - .is_some() - { - info!( - "Deregister alive countdown for region {region} in table {}", - self.table_ident - ) - } + .map(|x| { + info!( + "Deregister alive countdown for region {region} in table {}", + self.table_ident + ); + x + }) } async fn start(&self) { @@ -319,6 +324,8 @@ enum CountdownCommand { struct CountdownTaskHandle { tx: mpsc::Sender, handler: JoinHandle<()>, + table_ident: TableIdent, + region: RegionNumber, } impl CountdownTaskHandle { @@ -341,7 +348,7 @@ impl CountdownTaskHandle { let mut countdown_task = CountdownTask { table_engine, - table_ident, + table_ident: table_ident.clone(), region, rx, }; @@ -350,7 +357,12 @@ impl CountdownTaskHandle { on_task_finished().await; }); - Self { tx, handler } + Self { + tx, + handler, + table_ident, + region, + } } async fn start(&self, heartbeat_interval_millis: u64) { @@ -378,7 +390,11 @@ impl CountdownTaskHandle { impl Drop for CountdownTaskHandle { fn drop(&mut self) { - self.handler.abort() + debug!( + "Aborting region alive countdown task for region {} in table {}", + self.region, self.table_ident, + ); + self.handler.abort(); } } @@ -640,7 +656,8 @@ mod test { regions.sort(); assert_eq!(regions, vec![2, 3, 4]); - keepers.deregister_table(&table_ident).await; + let keeper = keepers.deregister_table(&table_ident).await.unwrap(); + assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped"); assert!(keepers.keepers.lock().await.is_empty()); } @@ -676,7 +693,8 @@ mod test { // assert keep_lived works if keeper is started assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later()); - keeper.deregister_region(region).await; + let handle = keeper.deregister_region(region).await.unwrap(); + assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped"); assert!(keeper.find_handle(®ion).await.is_none()); } From a314993ab4bec20ffe8e1b31c5ba7e749be5d224 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 21 Jun 2023 15:34:24 +0800 Subject: [PATCH 08/13] chore: change logstore default config (#1809) --- config/datanode.example.toml | 4 ++-- config/standalone.example.toml | 6 +++--- src/datanode/src/datanode.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a3d8fa2494..70b75d361d 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -26,8 +26,8 @@ tcp_nodelay = true [wal] # WAL data directory # dir = "/tmp/greptimedb/wal" -file_size = "1GB" -purge_threshold = "50GB" +file_size = "256MB" +purge_threshold = "4GB" purge_interval = "10m" read_batch_size = 128 sync_write = false diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 2aa0240b69..87e7b9e95c 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -81,9 +81,9 @@ addr = "127.0.0.1:4004" # WAL data directory # dir = "/tmp/greptimedb/wal" # WAL file size in bytes. -file_size = "1GB" -# WAL purge threshold in bytes. -purge_threshold = "50GB" +file_size = "256MB" +# WAL purge threshold. +purge_threshold = "4GB" # WAL purge interval in seconds. purge_interval = "10m" # WAL read batch size. diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 54d777d3f6..5f6c7d2562 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -192,8 +192,8 @@ impl Default for WalConfig { fn default() -> Self { Self { dir: None, - file_size: ReadableSize::gb(1), // log file size 1G - purge_threshold: ReadableSize::gb(50), // purge threshold 50G + file_size: ReadableSize::mb(256), // log file size 256MB + purge_threshold: ReadableSize::gb(4), // purge threshold 4GB purge_interval: Duration::from_secs(600), read_batch_size: 128, sync_write: false, From d1b5ce0d35203789536a703854d02b7d6cfdff27 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 21 Jun 2023 16:09:11 +0800 Subject: [PATCH 09/13] chore: check catalog deregister result (#1810) * chore: check deregister result and return error on failure * refactor: SystemCatalog::deregister_table returns Result<()> --- src/catalog/src/local/manager.rs | 5 +---- src/catalog/src/remote/manager.rs | 2 +- src/catalog/src/tables.rs | 14 ++++++++++++-- src/table-procedure/src/drop.rs | 14 +++++++++++--- src/table-procedure/src/error.rs | 7 ++++++- 5 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 1d66e2b961..3dd9d777fd 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -467,10 +467,7 @@ impl CatalogManager for LocalCatalogManager { .ident .table_id; - if !self.system.deregister_table(&request, table_id).await? { - return Ok(false); - } - + self.system.deregister_table(&request, table_id).await?; self.catalogs.deregister_table(request).await } } diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index eea2ddf67a..36545df835 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -662,7 +662,7 @@ impl CatalogManager for RemoteCatalogManager { .await; } - Ok(result.is_none()) + Ok(true) } async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index baf8e24fa2..175e0799a7 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; +use common_telemetry::logging; use snafu::ResultExt; use table::metadata::TableId; use table::{Table, TableRef}; @@ -91,12 +92,21 @@ impl SystemCatalog { &self, request: &DeregisterTableRequest, table_id: TableId, - ) -> CatalogResult { + ) -> CatalogResult<()> { self.information_schema .system .delete(build_table_deletion_request(request, table_id)) .await - .map(|x| x == 1) + .map(|x| { + if x != 1 { + let table = common_catalog::format_full_table_name( + &request.catalog, + &request.schema, + &request.table_name + ); + logging::warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}"); + } + }) .with_context(|_| error::DeregisterTableSnafu { request: request.clone(), }) diff --git a/src/table-procedure/src/drop.rs b/src/table-procedure/src/drop.rs index 9c436f6a9c..9a8c5314b4 100644 --- a/src/table-procedure/src/drop.rs +++ b/src/table-procedure/src/drop.rs @@ -28,7 +28,8 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableReference}; use table::requests::DropTableRequest; use crate::error::{ - AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu, + AccessCatalogSnafu, DeregisterTableSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, + TableNotFoundSnafu, }; /// Procedure to drop a table. @@ -158,10 +159,17 @@ impl DropTableProcedure { schema: self.data.request.schema_name.clone(), table_name: self.data.request.table_name.clone(), }; - self.catalog_manager + if !self + .catalog_manager .deregister_table(deregister_table_req) .await - .context(AccessCatalogSnafu)?; + .context(AccessCatalogSnafu)? + { + return DeregisterTableSnafu { + name: request.table_ref().to_string(), + } + .fail()?; + } } self.data.state = DropTableState::EngineDropTable; diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs index 25cca7c910..9ab6ec0c96 100644 --- a/src/table-procedure/src/error.rs +++ b/src/table-procedure/src/error.rs @@ -55,6 +55,9 @@ pub enum Error { #[snafu(display("Table already exists: {}", name))] TableExists { name: String }, + + #[snafu(display("Failed to deregister table: {}", name))] + DeregisterTable { name: String }, } pub type Result = std::result::Result; @@ -64,7 +67,9 @@ impl ErrorExt for Error { use Error::*; match self { - SerializeProcedure { .. } | DeserializeProcedure { .. } => StatusCode::Internal, + DeregisterTable { .. } | SerializeProcedure { .. } | DeserializeProcedure { .. } => { + StatusCode::Internal + } InvalidRawSchema { source, .. } => source.status_code(), AccessCatalog { source, .. } => source.status_code(), CatalogNotFound { .. } | SchemaNotFound { .. } | TableExists { .. } => { From b1ccc7ef5dbacfa7f3ca1d256848358c6916f029 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 21 Jun 2023 16:25:50 +0800 Subject: [PATCH 10/13] fix: prevent filter pushdown in distributed planner (#1806) * fix: prevent filter pushdown in distributed planner Signed-off-by: Ruihang Xia * fix metadata Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/commutativity.rs | 49 ++++++++++++++++++- .../optimizer/filter_push_down.result | 16 +++++- .../standalone/common/select/like.result | 29 +++++++++++ tests/cases/standalone/common/select/like.sql | 15 ++++++ 4 files changed, 105 insertions(+), 4 deletions(-) create mode 100644 tests/cases/standalone/common/select/like.result create mode 100644 tests/cases/standalone/common/select/like.sql diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 82344a856c..8f63aac051 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; @@ -37,7 +37,8 @@ impl Categorizer { pub fn check_plan(plan: &LogicalPlan) -> Commutativity { match plan { LogicalPlan::Projection(_) => Commutativity::Unimplemented, - LogicalPlan::Filter(_) => Commutativity::Commutative, + // TODO(ruihang): Change this to Commutative once Like is supported in substrait + LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate), LogicalPlan::Window(_) => Commutativity::Unimplemented, LogicalPlan::Aggregate(_) => { // check all children exprs and uses the strictest level @@ -85,6 +86,50 @@ impl Categorizer { _ => Commutativity::Unsupported, } } + + pub fn check_expr(expr: &Expr) -> Commutativity { + match expr { + Expr::Alias(_, _) + | Expr::Column(_) + | Expr::ScalarVariable(_, _) + | Expr::Literal(_) + | Expr::BinaryExpr(_) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::Negative(_) + | Expr::Between(_) + | Expr::Sort(_) + | Expr::Exists(_) => Commutativity::Commutative, + + Expr::Like(_) + | Expr::ILike(_) + | Expr::SimilarTo(_) + | Expr::IsUnknown(_) + | Expr::IsNotUnknown(_) + | Expr::GetIndexedField(_) + | Expr::Case(_) + | Expr::Cast(_) + | Expr::TryCast(_) + | Expr::ScalarFunction(_) + | Expr::ScalarUDF(_) + | Expr::AggregateFunction(_) + | Expr::WindowFunction(_) + | Expr::AggregateUDF(_) + | Expr::InList(_) + | Expr::InSubquery(_) + | Expr::ScalarSubquery(_) + | Expr::Wildcard => Commutativity::Unimplemented, + Expr::QualifiedWildcard { .. } + | Expr::GroupingSet(_) + | Expr::Placeholder(_) + | Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented, + } + } } pub type Transformer = Arc Option>; diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result index 5fe0b3ef60..6859a0b7ed 100644 --- a/tests/cases/distributed/optimizer/filter_push_down.result +++ b/tests/cases/distributed/optimizer/filter_push_down.result @@ -90,11 +90,23 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i= SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i; -Error: 3001(EngineExecuteQuery), No field named __correlated_sq_1.i. Valid fields are integers.i, integers.j. ++---+---+ +| i | j | ++---+---+ +| 1 | 1 | +| 2 | 2 | +| 3 | 3 | ++---+---+ SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i; -Error: 3001(EngineExecuteQuery), No field named __correlated_sq_2.i. Valid fields are integers.i, integers.j. ++---+---+ +| i | j | ++---+---+ +| 2 | 2 | +| 3 | 3 | +| | 4 | ++---+---+ SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i; diff --git a/tests/cases/standalone/common/select/like.result b/tests/cases/standalone/common/select/like.result new file mode 100644 index 0000000000..91b648e1a8 --- /dev/null +++ b/tests/cases/standalone/common/select/like.result @@ -0,0 +1,29 @@ +CREATE TABLE host ( + ts TIMESTAMP(3) TIME INDEX, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host VALUES + (0, 'a+b', 1.0), + (1, 'b+c', 2.0), + (2, 'a', 3.0), + (3, 'c', 4.0); + +Affected Rows: 4 + +SELECT * FROM host WHERE host LIKE '%+%'; + ++-------------------------+------+-----+ +| ts | host | val | ++-------------------------+------+-----+ +| 1970-01-01T00:00:00 | a+b | 1.0 | +| 1970-01-01T00:00:00.001 | b+c | 2.0 | ++-------------------------+------+-----+ + +DROP TABLE host; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/select/like.sql b/tests/cases/standalone/common/select/like.sql new file mode 100644 index 0000000000..b4ef76bb93 --- /dev/null +++ b/tests/cases/standalone/common/select/like.sql @@ -0,0 +1,15 @@ +CREATE TABLE host ( + ts TIMESTAMP(3) TIME INDEX, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host VALUES + (0, 'a+b', 1.0), + (1, 'b+c', 2.0), + (2, 'a', 3.0), + (3, 'c', 4.0); + +SELECT * FROM host WHERE host LIKE '%+%'; + +DROP TABLE host; From 5ab074709206c539657d91ac8766065addd3b084 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 21 Jun 2023 19:04:34 +0900 Subject: [PATCH 11/13] test(storage): wait task before checking scheduled task num (#1811) --- src/storage/src/scheduler.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/storage/src/scheduler.rs b/src/storage/src/scheduler.rs index 2c8d1c162a..4f6d1689cd 100644 --- a/src/storage/src/scheduler.rs +++ b/src/storage/src/scheduler.rs @@ -312,7 +312,7 @@ where #[cfg(test)] mod tests { - use std::sync::atomic::AtomicI32; + use std::sync::atomic::{AtomicBool, AtomicI32}; use std::time::Duration; use store_api::storage::RegionId; @@ -564,7 +564,9 @@ mod tests { let task_scheduled = Arc::new(AtomicI32::new(0)); let task_scheduled_cloned = task_scheduled.clone(); - common_runtime::spawn_write(async move { + let scheduling = Arc::new(AtomicBool::new(true)); + let scheduling_clone = scheduling.clone(); + let handle = common_runtime::spawn_write(async move { for i in 0..10000 { if let Ok(res) = scheduler_cloned.schedule(MockRequest { region_id: i as RegionId, @@ -573,12 +575,19 @@ mod tests { task_scheduled_cloned.fetch_add(1, Ordering::Relaxed); } } + + if !scheduling_clone.load(Ordering::Relaxed) { + break; + } } }); - tokio::time::sleep(Duration::from_millis(1)).await; scheduler.stop(true).await.unwrap(); + scheduling.store(false, Ordering::Relaxed); + let finished = finished.load(Ordering::Relaxed); + handle.await.unwrap(); + assert_eq!(finished, task_scheduled.load(Ordering::Relaxed)); } } From 5e06be51e41a132b420d7c0b1190b49e69a87b0c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 21 Jun 2023 15:07:52 +0800 Subject: [PATCH 12/13] add metrics in some interfaces Signed-off-by: Ruihang Xia --- src/common/query/src/physical_plan.rs | 24 ++++++++++++++++++++++-- src/common/recordbatch/src/adapter.rs | 27 ++++++++++++++++++++++++++- src/table/src/table/scan.rs | 7 +++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index 144e1bcd0b..1c148020ac 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; pub use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; pub use datafusion::physical_plan::Partitioning; use datafusion::physical_plan::Statistics; use datatypes::schema::SchemaRef; @@ -69,6 +70,10 @@ pub trait PhysicalPlan: Debug + Send + Sync { partition: usize, context: Arc, ) -> Result; + + fn metrics(&self) -> Option { + None + } } /// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`]. @@ -76,11 +81,16 @@ pub trait PhysicalPlan: Debug + Send + Sync { pub struct PhysicalPlanAdapter { schema: SchemaRef, df_plan: Arc, + metric: ExecutionPlanMetricsSet, } impl PhysicalPlanAdapter { pub fn new(schema: SchemaRef, df_plan: Arc) -> Self { - Self { schema, df_plan } + Self { + schema, + df_plan, + metric: ExecutionPlanMetricsSet::new(), + } } pub fn df_plan(&self) -> Arc { @@ -127,15 +137,21 @@ impl PhysicalPlan for PhysicalPlanAdapter { partition: usize, context: Arc, ) -> Result { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let df_plan = self.df_plan.clone(); let stream = df_plan .execute(partition, context) .context(error::GeneralDataFusionSnafu)?; - let adapter = RecordBatchStreamAdapter::try_new(stream) + let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric) .context(error::ConvertDfRecordBatchStreamSnafu)?; Ok(Box::pin(adapter)) } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } } #[derive(Debug)] @@ -196,6 +212,10 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { fn statistics(&self) -> Statistics { Statistics::default() } + + fn metrics(&self) -> Option { + self.0.metrics() + } } #[cfg(test)] diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 260b52fe41..24d5c2af4a 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -20,6 +20,7 @@ use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; +use datafusion::physical_plan::metrics::BaselineMetrics; use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; use datafusion_common::DataFusionError; use datatypes::schema::{Schema, SchemaRef}; @@ -115,13 +116,31 @@ impl Stream for DfRecordBatchStreamAdapter { pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, + metrics: Option, } impl RecordBatchStreamAdapter { pub fn try_new(stream: DfSendableRecordBatchStream) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); - Ok(Self { schema, stream }) + Ok(Self { + schema, + stream, + metrics: None, + }) + } + + pub fn try_new_with_metrics( + stream: DfSendableRecordBatchStream, + metrics: BaselineMetrics, + ) -> Result { + let schema = + Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); + Ok(Self { + schema, + stream, + metrics: Some(metrics), + }) } } @@ -135,6 +154,12 @@ impl Stream for RecordBatchStreamAdapter { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let timer = self + .metrics + .as_ref() + .map(|m| m.elapsed_compute().clone()) + .unwrap_or_default(); + let _guard = timer.timer(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index c510dbdde8..ab6446684a 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -21,6 +21,7 @@ use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::SendableRecordBatchStream; use datafusion::execution::context::TaskContext; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use snafu::OptionExt; @@ -30,6 +31,7 @@ pub struct StreamScanAdapter { stream: Mutex>, schema: SchemaRef, output_ordering: Option>, + metric: ExecutionPlanMetricsSet, } impl Debug for StreamScanAdapter { @@ -49,6 +51,7 @@ impl StreamScanAdapter { stream: Mutex::new(Some(stream)), schema, output_ordering: None, + metric: ExecutionPlanMetricsSet::new(), } } @@ -91,6 +94,10 @@ impl PhysicalPlan for StreamScanAdapter { let mut stream = self.stream.lock().unwrap(); stream.take().context(query_error::ExecuteRepeatedlySnafu) } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } } #[cfg(test)] From 7ddfa9d3e487973fa641e216e2b8e2a9874bb1a8 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 21 Jun 2023 17:18:13 +0800 Subject: [PATCH 13/13] calc elapsed time and rows Signed-off-by: Ruihang Xia --- src/table/src/table/scan.rs | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index ab6446684a..cdc2d17b79 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -14,16 +14,20 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use common_query::error as query_error; use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::execution::context::TaskContext; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; +use futures::{Stream, StreamExt}; use snafu::OptionExt; /// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. @@ -88,11 +92,16 @@ impl PhysicalPlan for StreamScanAdapter { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> QueryResult { let mut stream = self.stream.lock().unwrap(); - stream.take().context(query_error::ExecuteRepeatedlySnafu) + let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?; + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + Ok(Box::pin(StreamWithMetricWrapper { + stream, + metric: baseline_metric, + })) } fn metrics(&self) -> Option { @@ -100,6 +109,32 @@ impl PhysicalPlan for StreamScanAdapter { } } +pub struct StreamWithMetricWrapper { + stream: SendableRecordBatchStream, + metric: BaselineMetrics, +} + +impl Stream for StreamWithMetricWrapper { + type Item = RecordBatchResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let _timer = this.metric.elapsed_compute().timer(); + let poll = this.stream.poll_next_unpin(cx); + if let Poll::Ready(Option::Some(Result::Ok(record_batch))) = &poll { + this.metric.record_output(record_batch.num_rows()); + } + + poll + } +} + +impl RecordBatchStream for StreamWithMetricWrapper { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + #[cfg(test)] mod test { use common_recordbatch::{util, RecordBatch, RecordBatches};