From 2b2fd80bf475f04f284150b935a7108a8d1273d5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 23 Mar 2024 17:31:16 +0800 Subject: [PATCH] feat: return new added columns in region server's extension response (#3533) * feat: adapt the new proto response Signed-off-by: Ruihang Xia * update interfaces Signed-off-by: Ruihang Xia * write columns to extension Signed-off-by: Ruihang Xia * use physical column's schema Signed-off-by: Ruihang Xia * sort logical columns by name Signed-off-by: Ruihang Xia * format code Signed-off-by: Ruihang Xia * return physical table's column Signed-off-by: Ruihang Xia * Update src/common/meta/src/datanode_manager.rs Co-authored-by: JeremyHi * implement sort column logic Signed-off-by: Ruihang Xia * proxy create table procedure to create logical table Signed-off-by: Ruihang Xia * add unit test for sort_columns Signed-off-by: Ruihang Xia * update sqlness cases Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: JeremyHi --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/client/src/region.rs | 36 ++--- src/common/meta/src/datanode_manager.rs | 29 +++- .../meta/src/ddl/create_logical_tables.rs | 8 +- .../src/ddl/tests/create_logical_tables.rs | 7 +- src/common/meta/src/ddl/tests/create_table.rs | 13 +- src/common/meta/src/rpc/ddl.rs | 152 +++++++++++++++++- src/common/meta/src/test_util.rs | 8 +- src/datanode/src/region_server.rs | 35 ++-- src/datanode/src/tests.rs | 10 +- src/datatypes/src/schema/column_schema.rs | 11 ++ src/file-engine/src/engine.rs | 11 +- src/frontend/src/instance/standalone.rs | 8 +- src/meta-srv/src/procedure/utils.rs | 1 + src/metric-engine/src/data_region.rs | 36 +++-- src/metric-engine/src/engine.rs | 39 +++-- src/metric-engine/src/engine/alter.rs | 34 +++- src/metric-engine/src/engine/create.rs | 81 ++++++++-- src/metric-engine/src/engine/put.rs | 8 +- src/metric-engine/src/engine/read.rs | 6 +- .../src/engine/region_metadata.rs | 3 +- src/metric-engine/src/error.rs | 19 ++- src/metric-engine/src/metadata_region.rs | 2 +- src/mito2/src/engine.rs | 5 +- src/mito2/src/engine/basic_test.rs | 4 +- src/mito2/src/engine/compaction_test.rs | 16 +- src/mito2/src/test_util.rs | 12 +- src/operator/src/delete.rs | 5 +- src/operator/src/insert.rs | 5 +- src/operator/src/request.rs | 5 +- src/operator/src/statement/ddl.rs | 17 ++ src/store-api/src/metadata.rs | 10 ++ src/store-api/src/metric_engine_consts.rs | 4 + src/store-api/src/region_engine.rs | 20 ++- src/table/src/metadata.rs | 6 + tests-integration/src/prom_store.rs | 8 +- .../common/create/create_metric_table.result | 4 +- .../common/insert/logical_metric_table.result | 28 ++-- .../common/insert/logical_metric_table.sql | 4 +- 40 files changed, 543 insertions(+), 171 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 385ae145c4..9a12234862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3870,7 +3870,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96f1f0404f421ee560a4310c73c5071e49168168#96f1f0404f421ee560a4310c73c5071e49168168" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=349cb385583697f41010dabeb3c106d58f9599b4#349cb385583697f41010dabeb3c106d58f9599b4" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4681c65354..509d7f84c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96f1f0404f421ee560a4310c73c5071e49168168" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "349cb385583697f41010dabeb3c106d58f9599b4" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 819c4453f9..39c5ddddcc 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; +use api::v1::region::{QueryRequest, RegionRequest}; use api::v1::ResponseHeader; use arc_swap::ArcSwapOption; use arrow_flight::Ticket; @@ -23,7 +23,7 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_grpc::flight::{FlightDecoder, FlightMessage}; -use common_meta::datanode_manager::{AffectedRows, Datanode}; +use common_meta::datanode_manager::{Datanode, HandleResponse}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; @@ -46,7 +46,7 @@ pub struct RegionRequester { #[async_trait] impl Datanode for RegionRequester { - async fn handle(&self, request: RegionRequest) -> MetaResult { + async fn handle(&self, request: RegionRequest) -> MetaResult { self.handle_inner(request).await.map_err(|err| { if err.should_retry() { meta_error::Error::RetryLater { @@ -165,7 +165,7 @@ impl RegionRequester { Ok(Box::pin(record_batch_stream)) } - async fn handle_inner(&self, request: RegionRequest) -> Result { + async fn handle_inner(&self, request: RegionRequest) -> Result { let request_type = request .body .as_ref() @@ -178,10 +178,7 @@ impl RegionRequester { let mut client = self.client.raw_region_client()?; - let RegionResponse { - header, - affected_rows, - } = client + let response = client .handle(request) .await .map_err(|e| { @@ -195,19 +192,20 @@ impl RegionRequester { })? .into_inner(); - check_response_header(header)?; + check_response_header(&response.header)?; - Ok(affected_rows as _) + Ok(HandleResponse::from_region_response(response)) } - pub async fn handle(&self, request: RegionRequest) -> Result { + pub async fn handle(&self, request: RegionRequest) -> Result { self.handle_inner(request).await } } -pub fn check_response_header(header: Option) -> Result<()> { +pub fn check_response_header(header: &Option) -> Result<()> { let status = header - .and_then(|header| header.status) + .as_ref() + .and_then(|header| header.status.as_ref()) .context(IllegalDatabaseResponseSnafu { err_msg: "either response header or status is missing", })?; @@ -221,7 +219,7 @@ pub fn check_response_header(header: Option) -> Result<()> { })?; ServerSnafu { code, - msg: status.err_msg, + msg: status.err_msg.clone(), } .fail() } @@ -236,19 +234,19 @@ mod test { #[test] fn test_check_response_header() { - let result = check_response_header(None); + let result = check_response_header(&None); assert!(matches!( result.unwrap_err(), IllegalDatabaseResponse { .. } )); - let result = check_response_header(Some(ResponseHeader { status: None })); + let result = check_response_header(&Some(ResponseHeader { status: None })); assert!(matches!( result.unwrap_err(), IllegalDatabaseResponse { .. } )); - let result = check_response_header(Some(ResponseHeader { + let result = check_response_header(&Some(ResponseHeader { status: Some(PbStatus { status_code: StatusCode::Success as u32, err_msg: String::default(), @@ -256,7 +254,7 @@ mod test { })); assert!(result.is_ok()); - let result = check_response_header(Some(ResponseHeader { + let result = check_response_header(&Some(ResponseHeader { status: Some(PbStatus { status_code: u32::MAX, err_msg: String::default(), @@ -267,7 +265,7 @@ mod test { IllegalDatabaseResponse { .. } )); - let result = check_response_header(Some(ResponseHeader { + let result = check_response_header(&Some(ResponseHeader { status: Some(PbStatus { status_code: StatusCode::Internal as u32, err_msg: "blabla".to_string(), diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs index 4795512f25..1fdc2f314e 100644 --- a/src/common/meta/src/datanode_manager.rs +++ b/src/common/meta/src/datanode_manager.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; -use api::v1::region::{QueryRequest, RegionRequest}; +use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; @@ -25,7 +26,7 @@ use crate::peer::Peer; #[async_trait::async_trait] pub trait Datanode: Send + Sync { /// Handles DML, and DDL requests. - async fn handle(&self, request: RegionRequest) -> Result; + async fn handle(&self, request: RegionRequest) -> Result; /// Handles query requests async fn handle_query(&self, request: QueryRequest) -> Result; @@ -41,3 +42,27 @@ pub trait DatanodeManager: Send + Sync { } pub type DatanodeManagerRef = Arc; + +/// This result struct is derived from [RegionResponse] +#[derive(Debug)] +pub struct HandleResponse { + pub affected_rows: AffectedRows, + pub extension: HashMap>, +} + +impl HandleResponse { + pub fn from_region_response(region_response: RegionResponse) -> Self { + Self { + affected_rows: region_response.affected_rows as _, + extension: region_response.extension, + } + } + + /// Creates one response without extension + pub fn new(affected_rows: AffectedRows) -> Self { + Self { + affected_rows, + extension: Default::default(), + } + } +} diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 8831e3aeb2..35a32142e4 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -70,6 +70,7 @@ impl CreateLogicalTablesProcedure { /// - Checks whether physical table exists. /// - Checks whether logical tables exist. /// - Allocates the table ids. + /// - Modify tasks to sort logical columns on their names. /// /// Abort(non-retry): /// - The physical table does not exist. @@ -130,7 +131,7 @@ impl CreateLogicalTablesProcedure { )); } - // Allocates table ids + // Allocates table ids and sort columns on their names. for (task, table_id) in tasks.iter_mut().zip(already_exists_tables_ids.iter()) { let table_id = if let Some(table_id) = table_id { *table_id @@ -141,6 +142,11 @@ impl CreateLogicalTablesProcedure { .await? }; task.set_table_id(table_id); + + // sort columns in task + task.sort_columns(); + + common_telemetry::info!("[DEBUG] sorted task {:?}", task); } self.creator diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index e6710d7440..7f82d372ca 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -28,6 +28,7 @@ use common_telemetry::debug; use store_api::storage::RegionId; use table::metadata::RawTableInfo; +use crate::datanode_manager::HandleResponse; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; @@ -36,7 +37,7 @@ use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager}; +use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; // Note: this code may be duplicated with others. // However, it's by design, ensures the tests are easy to be modified or added. @@ -332,9 +333,9 @@ pub struct NaiveDatanodeHandler; #[async_trait::async_trait] impl MockDatanodeHandler for NaiveDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - Ok(0) + Ok(HandleResponse::new(0)) } async fn handle_query( diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index d31c78479d..3040ae6d2f 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -26,6 +26,7 @@ use common_procedure_test::MockContextProvider; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; +use crate::datanode_manager::HandleResponse; use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::test_util::create_table::build_raw_table_info_from_expr; use crate::ddl::test_util::{TestColumnDefBuilder, TestCreateTableExprBuilder}; @@ -34,11 +35,11 @@ use crate::error::{Error, Result}; use crate::key::table_route::TableRouteValue; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; -use crate::test_util::{new_ddl_context, AffectedRows, MockDatanodeHandler, MockDatanodeManager}; +use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; #[async_trait::async_trait] impl MockDatanodeHandler for () { - async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { unreachable!() } @@ -176,7 +177,7 @@ pub struct RetryErrorDatanodeHandler; #[async_trait::async_trait] impl MockDatanodeHandler for RetryErrorDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { debug!("Returning retry later for request: {request:?}, peer: {peer:?}"); Err(Error::RetryLater { source: BoxedError::new( @@ -220,7 +221,7 @@ pub struct UnexpectedErrorDatanodeHandler; #[async_trait::async_trait] impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { debug!("Returning mock error for request: {request:?}, peer: {peer:?}"); error::UnexpectedSnafu { err_msg: "mock error", @@ -260,9 +261,9 @@ pub struct NaiveDatanodeHandler; #[async_trait::async_trait] impl MockDatanodeHandler for NaiveDatanodeHandler { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result { debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}"); - Ok(0) + Ok(HandleResponse::new(0)) } async fn handle_query( diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index b88867bd3d..b97924aa9f 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -22,7 +22,7 @@ use api::v1::meta::{ DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; -use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, TruncateTableExpr}; +use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr, SemanticType, TruncateTableExpr}; use base64::engine::general_purpose; use base64::Engine as _; use prost::Message; @@ -368,6 +368,44 @@ impl CreateTableTask { pub fn set_table_id(&mut self, table_id: TableId) { self.table_info.ident.table_id = table_id; } + + /// Sort the columns in [CreateTableExpr] and [RawTableInfo]. + /// + /// This function won't do any check or verification. Caller should + /// ensure this task is valid. + pub fn sort_columns(&mut self) { + // sort create table expr + // sort column_defs by name + self.create_table + .column_defs + .sort_unstable_by(|a, b| a.name.cmp(&b.name)); + + // compute new indices of sorted columns + // this part won't do any check or verification. + let mut primary_key_indices = Vec::with_capacity(self.create_table.primary_keys.len()); + let mut value_indices = + Vec::with_capacity(self.create_table.column_defs.len() - primary_key_indices.len() - 1); + let mut timestamp_index = None; + for (index, col) in self.create_table.column_defs.iter().enumerate() { + if self.create_table.primary_keys.contains(&col.name) { + primary_key_indices.push(index); + } else if col.semantic_type == SemanticType::Timestamp as i32 { + timestamp_index = Some(index); + } else { + value_indices.push(index); + } + } + + // overwrite table info + self.table_info + .meta + .schema + .column_schemas + .sort_unstable_by(|a, b| a.name.cmp(&b.name)); + self.table_info.meta.schema.timestamp_index = timestamp_index; + self.table_info.meta.primary_key_indices = primary_key_indices; + self.table_info.meta.value_indices = value_indices; + } } impl Serialize for CreateTableTask { @@ -555,9 +593,11 @@ impl TryFrom for PbTruncateTableTask { mod tests { use std::sync::Arc; - use api::v1::{AlterExpr, CreateTableExpr}; - use datatypes::schema::SchemaBuilder; - use table::metadata::RawTableInfo; + use api::v1::{AlterExpr, ColumnDef, CreateTableExpr, SemanticType}; + use datatypes::schema::{ColumnSchema, RawSchema, SchemaBuilder}; + use store_api::metric_engine_consts::METRIC_ENGINE_NAME; + use store_api::storage::ConcreteDataType; + use table::metadata::{RawTableInfo, RawTableMeta, TableType}; use table::test_util::table_info::test_table_info; use super::{AlterTableTask, CreateTableTask}; @@ -589,4 +629,108 @@ mod tests { let de = serde_json::from_slice(&output).unwrap(); assert_eq!(task, de); } + + #[test] + fn test_sort_columns() { + // construct RawSchema + let raw_schema = RawSchema { + column_schemas: vec![ + ColumnSchema::new( + "column3".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "column1".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new( + "column2".to_string(), + ConcreteDataType::float64_datatype(), + true, + ), + ], + timestamp_index: Some(1), + version: 0, + }; + + // construct RawTableMeta + let raw_table_meta = RawTableMeta { + schema: raw_schema, + primary_key_indices: vec![0], + value_indices: vec![2], + engine: METRIC_ENGINE_NAME.to_string(), + next_column_id: 0, + region_numbers: vec![0], + options: Default::default(), + created_on: Default::default(), + partition_key_indices: Default::default(), + }; + + // construct RawTableInfo + let raw_table_info = RawTableInfo { + ident: Default::default(), + meta: raw_table_meta, + name: Default::default(), + desc: Default::default(), + catalog_name: Default::default(), + schema_name: Default::default(), + table_type: TableType::Base, + }; + + // construct create table expr + let create_table_expr = CreateTableExpr { + column_defs: vec![ + ColumnDef { + name: "column3".to_string(), + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnDef { + name: "column1".to_string(), + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnDef { + name: "column2".to_string(), + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ], + primary_keys: vec!["column3".to_string()], + ..Default::default() + }; + + let mut create_table_task = + CreateTableTask::new(create_table_expr, Vec::new(), raw_table_info); + + // Call the sort_columns method + create_table_task.sort_columns(); + + // Assert that the columns are sorted correctly + assert_eq!( + create_table_task.create_table.column_defs[0].name, + "column1".to_string() + ); + assert_eq!( + create_table_task.create_table.column_defs[1].name, + "column2".to_string() + ); + assert_eq!( + create_table_task.create_table.column_defs[2].name, + "column3".to_string() + ); + + // Assert that the table_info is updated correctly + assert_eq!( + create_table_task.table_info.meta.schema.timestamp_index, + Some(0) + ); + assert_eq!( + create_table_task.table_info.meta.primary_key_indices, + vec![2] + ); + assert_eq!(create_table_task.table_info.meta.value_indices, vec![1]); + } } diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 605be9dfe3..3a312fa021 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -19,7 +19,9 @@ pub use common_base::AffectedRows; use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; -use crate::datanode_manager::{Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef}; +use crate::datanode_manager::{ + Datanode, DatanodeManager, DatanodeManagerRef, DatanodeRef, HandleResponse, +}; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::DdlContext; use crate::error::Result; @@ -32,7 +34,7 @@ use crate::wal_options_allocator::WalOptionsAllocator; #[async_trait::async_trait] pub trait MockDatanodeHandler: Sync + Send + Clone { - async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result; + async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result; async fn handle_query( &self, @@ -62,7 +64,7 @@ struct MockDatanode { #[async_trait::async_trait] impl Datanode for MockDatanode { - async fn handle(&self, request: RegionRequest) -> Result { + async fn handle(&self, request: RegionRequest) -> Result { self.handler.handle(&self.peer, request).await } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index d9b74e02aa..917fba197d 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use bytes::Bytes; use common_error::ext::BoxedError; use common_error::status_code::StatusCode; +use common_meta::datanode_manager::HandleResponse; use common_query::logical_plan::Expr; use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, OutputData}; @@ -128,7 +129,7 @@ impl RegionServer { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner.handle_request(region_id, request).await } @@ -267,11 +268,10 @@ impl RegionServerHandler for RegionServer { results }; - // merge results by simply sum up affected rows. - // only insert/delete will have multiple results. + // merge results by sum up affected rows and merge extensions. let mut affected_rows = 0; for result in results { - affected_rows += result; + affected_rows += result.affected_rows; } Ok(RegionResponse { @@ -282,6 +282,7 @@ impl RegionServerHandler for RegionServer { }), }), affected_rows: affected_rows as _, + extension: Default::default(), }) } } @@ -462,7 +463,7 @@ impl RegionServerInner { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { let request_type = request.request_type(); let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED .with_label_values(&[request_type]) @@ -487,7 +488,7 @@ impl RegionServerInner { let engine = match self.get_engine(region_id, ®ion_change)? { CurrentEngine::Engine(engine) => engine, - CurrentEngine::EarlyReturn(rows) => return Ok(rows), + CurrentEngine::EarlyReturn(rows) => return Ok(HandleResponse::new(rows)), }; // Sets corresponding region status to registering/deregistering before the operation. @@ -502,7 +503,10 @@ impl RegionServerInner { // Sets corresponding region status to ready. self.set_region_status_ready(region_id, engine, region_change) .await?; - Ok(result) + Ok(HandleResponse { + affected_rows: result.affected_rows, + extension: result.extension, + }) } Err(err) => { // Removes the region status if the operation fails. @@ -645,6 +649,7 @@ impl RegionServerInner { .decode(Bytes::from(plan), catalog_list, "", "") .await .context(DecodeLogicalPlanSnafu)?; + let result = self .query_engine .execute(logical_plan.into(), ctx) @@ -916,11 +921,11 @@ mod tests { RegionEngineWithStatus::Registering(engine.clone()), ); - let affected_rows = mock_region_server + let response = mock_region_server .handle_request(region_id, RegionRequest::Create(create_req)) .await .unwrap(); - assert_eq!(affected_rows, 0); + assert_eq!(response.affected_rows, 0); let status = mock_region_server .inner @@ -931,7 +936,7 @@ mod tests { assert!(matches!(status, RegionEngineWithStatus::Registering(_))); - let affected_rows = mock_region_server + let response = mock_region_server .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { @@ -943,7 +948,7 @@ mod tests { ) .await .unwrap(); - assert_eq!(affected_rows, 0); + assert_eq!(response.affected_rows, 0); let status = mock_region_server .inner @@ -971,11 +976,11 @@ mod tests { RegionEngineWithStatus::Deregistering(engine.clone()), ); - let affected_rows = mock_region_server + let response = mock_region_server .handle_request(region_id, RegionRequest::Drop(RegionDropRequest {})) .await .unwrap(); - assert_eq!(affected_rows, 0); + assert_eq!(response.affected_rows, 0); let status = mock_region_server .inner @@ -990,11 +995,11 @@ mod tests { RegionEngineWithStatus::Deregistering(engine.clone()), ); - let affected_rows = mock_region_server + let response = mock_region_server .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) .await .unwrap(); - assert_eq!(affected_rows, 0); + assert_eq!(response.affected_rows, 0); let status = mock_region_server .inner diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 1a38c88f0c..cffa382865 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -31,7 +31,7 @@ use query::query_engine::DescribeResult; use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; @@ -166,16 +166,18 @@ impl RegionEngine for MockRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { if let Some(delay) = self.handle_request_delay { tokio::time::sleep(delay).await; } if let Some(mock_fn) = &self.handle_request_mock_fn { - return mock_fn(region_id, request).map_err(BoxedError::new); + return mock_fn(region_id, request) + .map_err(BoxedError::new) + .map(RegionHandleResult::new); }; let _ = self.sender.send((region_id, request)).await; - Ok(0) + Ok(RegionHandleResult::new(0)) } async fn handle_query( diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 17c39ee11b..f515370f0e 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -143,11 +143,22 @@ impl ColumnSchema { } /// Set the nullablity to `true` of the column. + /// Similar to [set_nullable] but take the ownership and return a owned value. + /// + /// [set_nullable]: Self::set_nullable pub fn with_nullable_set(mut self) -> Self { self.is_nullable = true; self } + /// Set the nullability to `true` of the column. + /// Similar to [with_nullable_set] but don't take the ownership + /// + /// [with_nullable_set]: Self::with_nullable_set + pub fn set_nullable(&mut self) { + self.is_nullable = true; + } + /// Creates a new [`ColumnSchema`] with given metadata. pub fn with_metadata(mut self, metadata: Metadata) -> Self { self.metadata = metadata; diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index f5a7d0e259..0b9192b5c9 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -24,7 +24,7 @@ use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse}; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, @@ -60,7 +60,7 @@ impl RegionEngine for FileRegionEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await @@ -154,8 +154,8 @@ impl EngineInner { &self, region_id: RegionId, request: RegionRequest, - ) -> EngineResult { - match request { + ) -> EngineResult { + let result = match request { RegionRequest::Create(req) => self.handle_create(region_id, req).await, RegionRequest::Drop(req) => self.handle_drop(region_id, req).await, RegionRequest::Open(req) => self.handle_open(region_id, req).await, @@ -164,7 +164,8 @@ impl EngineInner { operation: request.to_string(), } .fail(), - } + }; + result.map(RegionHandleResult::new) } async fn stop(&self) -> EngineResult<()> { diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 2875850425..615e14f5d9 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -18,7 +18,7 @@ use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; -use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; +use common_meta::datanode_manager::{Datanode, DatanodeManager, DatanodeRef, HandleResponse}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_recordbatch::SendableRecordBatchStream; @@ -63,7 +63,7 @@ impl RegionInvoker { #[async_trait] impl Datanode for RegionInvoker { - async fn handle(&self, request: RegionRequest) -> MetaResult { + async fn handle(&self, request: RegionRequest) -> MetaResult { let span = request .header .as_ref() @@ -76,10 +76,10 @@ impl Datanode for RegionInvoker { .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - check_response_header(response.header) + check_response_header(&response.header) .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - Ok(response.affected_rows as _) + Ok(HandleResponse::from_region_response(response)) } async fn handle_query(&self, request: QueryRequest) -> MetaResult { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 0a9ace054c..408aeb1f29 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -93,6 +93,7 @@ pub mod mock { }), }), affected_rows: 0, + extension: Default::default(), }) } } diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 16028c7027..9207d0f107 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -58,18 +58,19 @@ impl DataRegion { /// Invoker don't need to set up or verify the column id. This method will adjust /// it using underlying schema. /// - /// This method will also set the nullable marker to true. + /// This method will also set the nullable marker to true. All of those change are applies + /// to `columns` in-place. pub async fn add_columns( &self, region_id: RegionId, - columns: Vec, + columns: &mut [ColumnMetadata], ) -> Result<()> { let region_id = utils::to_data_region_id(region_id); let mut retries = 0; // submit alter request while retries < MAX_RETRIES { - let request = self.assemble_alter_request(region_id, &columns).await?; + let request = self.assemble_alter_request(region_id, columns).await?; let _timer = MITO_DDL_DURATION.start_timer(); @@ -90,10 +91,12 @@ impl DataRegion { Ok(()) } + /// Generate warpped [RegionAlterRequest] with given [ColumnMetadata]. + /// This method will modify `columns` in-place. async fn assemble_alter_request( &self, region_id: RegionId, - columns: &[ColumnMetadata], + columns: &mut [ColumnMetadata], ) -> Result { // retrieve underlying version let region_metadata = self @@ -118,15 +121,14 @@ impl DataRegion { .unwrap_or(0); // overwrite semantic type - let columns = columns - .iter() + let new_columns = columns + .iter_mut() .enumerate() .map(|(delta, c)| { - let mut c = c.clone(); if c.semantic_type == SemanticType::Tag { if !c.column_schema.data_type.is_string() { return ColumnTypeMismatchSnafu { - column_type: c.column_schema.data_type, + column_type: c.column_schema.data_type.clone(), } .fail(); } @@ -138,11 +140,10 @@ impl DataRegion { }; c.column_id = new_column_id_start + delta as u32; - - c.column_schema = c.column_schema.with_nullable_set(); + c.column_schema.set_nullable(); Ok(AddColumn { - column_metadata: c, + column_metadata: c.clone(), location: None, }) }) @@ -151,7 +152,9 @@ impl DataRegion { // assemble alter request let alter_request = RegionRequest::Alter(RegionAlterRequest { schema_version: version, - kind: AlterKind::AddColumns { columns }, + kind: AlterKind::AddColumns { + columns: new_columns, + }, }); Ok(alter_request) @@ -167,6 +170,7 @@ impl DataRegion { .handle_request(region_id, RegionRequest::Put(request)) .await .context(MitoWriteOperationSnafu) + .map(|result| result.affected_rows) } pub async fn physical_columns( @@ -205,7 +209,7 @@ mod test { // TestEnv will create a logical region which changes the version to 1. assert_eq!(current_version, 1); - let new_columns = vec![ + let mut new_columns = vec![ ColumnMetadata { column_id: 0, semantic_type: SemanticType::Tag, @@ -226,7 +230,7 @@ mod test { }, ]; env.data_region() - .add_columns(env.default_physical_region_id(), new_columns) + .add_columns(env.default_physical_region_id(), &mut new_columns) .await .unwrap(); @@ -258,14 +262,14 @@ mod test { let env = TestEnv::new().await; env.init_metric_region().await; - let new_columns = vec![ColumnMetadata { + let mut new_columns = vec![ColumnMetadata { column_id: 0, semantic_type: SemanticType::Tag, column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false), }]; let result = env .data_region() - .add_columns(env.default_physical_region_id(), new_columns) + .add_columns(env.default_physical_region_id(), &mut new_columns) .await; assert!(result.is_err()); } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 1240b7cd6f..ab6991a13c 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -24,6 +24,7 @@ mod region_metadata; mod state; use std::any::Any; +use std::collections::HashMap; use std::sync::{Arc, RwLock}; use async_trait::async_trait; @@ -33,13 +34,13 @@ use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; -use store_api::region_request::{AffectedRows, RegionRequest}; +use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse}; +use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; use self::state::MetricEngineState; use crate::data_region::DataRegion; -use crate::error::Result; +use crate::error::{Result, UnsupportedRegionRequestSnafu}; use crate::metadata_region::MetadataRegion; use crate::utils; @@ -121,23 +122,39 @@ impl RegionEngine for MetricEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { + let mut extension_return_value = HashMap::new(); + let result = match request { RegionRequest::Put(put) => self.inner.put_region(region_id, put).await, - RegionRequest::Delete(_) => todo!(), - RegionRequest::Create(create) => self.inner.create_region(region_id, create).await, + RegionRequest::Create(create) => { + self.inner + .create_region(region_id, create, &mut extension_return_value) + .await + } RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await, RegionRequest::Open(open) => self.inner.open_region(region_id, open).await, RegionRequest::Close(close) => self.inner.close_region(region_id, close).await, - RegionRequest::Alter(alter) => self.inner.alter_region(region_id, alter).await, - RegionRequest::Flush(_) => todo!(), - RegionRequest::Compact(_) => todo!(), - RegionRequest::Truncate(_) => todo!(), + RegionRequest::Alter(alter) => { + self.inner + .alter_region(region_id, alter, &mut extension_return_value) + .await + } + RegionRequest::Delete(_) + | RegionRequest::Flush(_) + | RegionRequest::Compact(_) + | RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(), // It always Ok(0), all data is the latest. RegionRequest::Catchup(_) => Ok(0), }; - result.map_err(BoxedError::new) + // TODO: pass extension + result + .map_err(BoxedError::new) + .map(|rows| RegionHandleResult { + affected_rows: rows, + extension: extension_return_value, + }) } /// Handles substrait query and return a stream of record batches diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 60520407ec..c99942dec6 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -12,13 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use common_telemetry::{error, info}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::region_request::{AffectedRows, AlterKind, RegionAlterRequest}; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; -use crate::error::{ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result}; +use crate::error::{ + ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, +}; use crate::metrics::FORBIDDEN_OPERATION_COUNT; use crate::utils::{to_data_region_id, to_metadata_region_id}; @@ -28,23 +34,39 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionAlterRequest, + extension_return_value: &mut HashMap>, ) -> Result { let is_altering_physical_region = self.is_physical_region(region_id); let result = if is_altering_physical_region { self.alter_physical_region(region_id, request).await } else { - self.alter_logical_region(region_id, request).await + let physical_region_id = self.alter_logical_region(region_id, request).await?; + + // Add physical table's column to extension map. + // It's ok to overwrite existing key, as the latter come schema is more up-to-date + let physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns) + .context(SerializeColumnMetadataSnafu)?, + ); + + Ok(()) }; result.map(|_| 0) } + /// Return the physical region id behind this logical region async fn alter_logical_region( &self, region_id: RegionId, request: RegionAlterRequest, - ) -> Result<()> { + ) -> Result { let physical_region_id = { let state = &self.state.read().unwrap(); state.get_physical_region_id(region_id).with_context(|| { @@ -55,7 +77,7 @@ impl MetricEngineInner { // only handle adding column let AlterKind::AddColumns { columns } = request.kind else { - return Ok(()); + return Ok(physical_region_id); }; let metadata_region_id = to_metadata_region_id(physical_region_id); @@ -92,7 +114,7 @@ impl MetricEngineInner { .await?; } - Ok(()) + Ok(physical_region_id) } async fn alter_physical_region( diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 8fad9f9dc8..b8b39a11dd 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -15,6 +15,7 @@ use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; +use common_error::ext::BoxedError; use common_telemetry::info; use common_time::Timestamp; use datatypes::data_type::ConcreteDataType; @@ -25,11 +26,12 @@ use object_store::util::join_dir; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ - DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, - LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, METADATA_SCHEMA_KEY_COLUMN_INDEX, - METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, - METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, - METADATA_SCHEMA_VALUE_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY, + ALTER_PHYSICAL_EXTENSION_KEY, DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + DATA_SCHEMA_TSID_COLUMN_NAME, LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, + METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, + METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, + PHYSICAL_TABLE_METADATA_KEY, }; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest}; @@ -41,8 +43,9 @@ use crate::engine::options::{ }; use crate::engine::MetricEngineInner; use crate::error::{ - ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, - MissingRegionOptionSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result, + ColumnNotFoundSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu, + InternalColumnOccupiedSnafu, MissingRegionOptionSnafu, MitoReadOperationSnafu, + ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result, SerializeColumnMetadataSnafu, }; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils::{to_data_region_id, to_metadata_region_id}; @@ -53,13 +56,28 @@ impl MetricEngineInner { &self, region_id: RegionId, request: RegionCreateRequest, + extension_return_value: &mut HashMap>, ) -> Result { Self::verify_region_create_request(&request)?; let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { self.create_physical_region(region_id, request).await } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { - self.create_logical_region(region_id, request).await + let physical_region_id = self.create_logical_region(region_id, request).await?; + + // Add physical table's column to extension map. + // It's ok to overwrite existing key, as the latter come schema is more up-to-date + let physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + extension_return_value.insert( + ALTER_PHYSICAL_EXTENSION_KEY.to_string(), + ColumnMetadata::encode_list(&physical_columns) + .context(SerializeColumnMetadataSnafu)?, + ); + + Ok(()) } else { MissingRegionOptionSnafu {}.fail() }; @@ -126,11 +144,16 @@ impl MetricEngineInner { /// This method will alter the data region to add columns if necessary. /// /// If the logical region to create already exists, this method will do nothing. + /// + /// `alter_request` is a hashmap that stores the alter requests that were executed + /// to the physical region. + /// + /// Return the physical region id of this logical region async fn create_logical_region( &self, logical_region_id: RegionId, request: RegionCreateRequest, - ) -> Result<()> { + ) -> Result { // transform IDs let physical_region_id_raw = request .options @@ -151,11 +174,12 @@ impl MetricEngineInner { .await? { info!("Create a existing logical region {logical_region_id}. Skipped"); - return Ok(()); + return Ok(data_region_id); } // find new columns to add let mut new_columns = vec![]; + let mut existing_columns = vec![]; { let state = &self.state.read().unwrap(); let physical_columns = @@ -168,6 +192,8 @@ impl MetricEngineInner { for col in &request.column_metadatas { if !physical_columns.contains(&col.column_schema.name) { new_columns.push(col.clone()); + } else { + existing_columns.push(col.column_schema.name.clone()); } } } @@ -188,9 +214,28 @@ impl MetricEngineInner { self.metadata_region .add_logical_region(metadata_region_id, logical_region_id) .await?; - for col in &request.column_metadatas { + + // register existing physical column to this new logical region. + let physical_schema = self + .data_region + .physical_columns(data_region_id) + .await + .map_err(BoxedError::new) + .context(MitoReadOperationSnafu)?; + let physical_schema_map = physical_schema + .into_iter() + .map(|metadata| (metadata.column_schema.name.clone(), metadata)) + .collect::>(); + for col in &existing_columns { + let column_metadata = physical_schema_map + .get(col) + .with_context(|| ColumnNotFoundSnafu { + name: col, + region_id: physical_region_id, + })? + .clone(); self.metadata_region - .add_column(metadata_region_id, logical_region_id, col) + .add_column(metadata_region_id, logical_region_id, &column_metadata) .await?; } @@ -203,19 +248,21 @@ impl MetricEngineInner { info!("Created new logical region {logical_region_id} on physical region {data_region_id}"); LOGICAL_REGION_COUNT.inc(); - Ok(()) + Ok(data_region_id) } + /// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be + /// cloned into `added_columns`. pub(crate) async fn add_columns_to_physical_data_region( &self, data_region_id: RegionId, metadata_region_id: RegionId, logical_region_id: RegionId, - new_columns: Vec, + mut new_columns: Vec, ) -> Result<()> { // alter data region self.data_region - .add_columns(data_region_id, new_columns.clone()) + .add_columns(data_region_id, &mut new_columns) .await?; // register columns to metadata region @@ -362,13 +409,13 @@ impl MetricEngineInner { // concat region dir data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR); - // convert semantic type + // change nullability for tag columns data_region_request .column_metadatas .iter_mut() .for_each(|metadata| { if metadata.semantic_type == SemanticType::Tag { - metadata.semantic_type = SemanticType::Field; + metadata.column_schema.set_nullable(); } }); diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 5ce2bf227b..72c2aeb180 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -215,12 +215,12 @@ mod tests { // write data let logical_region_id = env.default_logical_region_id(); - let count = env + let result = env .metric() .handle_request(logical_region_id, request) .await .unwrap(); - assert_eq!(count, 5); + assert_eq!(result.affected_rows, 5); // read data from physical region let physical_region_id = env.default_physical_region_id(); @@ -287,11 +287,11 @@ mod tests { }); // write data - let count = engine + let result = engine .handle_request(logical_region_id, request) .await .unwrap(); - assert_eq!(100, count); + assert_eq!(100, result.affected_rows); } #[tokio::test] diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 154a9a61b0..6093d41cd2 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -143,6 +143,7 @@ impl MetricEngineInner { self.default_projection(physical_region_id, logical_region_id) .await? }; + request.projection = Some(physical_projection); // add table filter @@ -186,6 +187,7 @@ impl MetricEngineInner { .get_metadata(data_region_id) .await .context(MitoReadOperationSnafu)?; + for name in projected_logical_names { // Safety: logical columns is a strict subset of physical columns physical_projection.push(physical_metadata.column_index_by_name(&name).unwrap()); @@ -301,7 +303,7 @@ mod test { .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]); + assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]); assert_eq!(scan_req.filters.len(), 1); assert_eq!( scan_req.filters[0], @@ -318,6 +320,6 @@ mod test { .transform_request(physical_region_id, logical_region_id, scan_req) .await .unwrap(); - assert_eq!(scan_req.projection.unwrap(), vec![0, 1, 4, 8, 9, 10, 11]); + assert_eq!(scan_req.projection.unwrap(), vec![11, 10, 9, 8, 0, 1, 4]); } } diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 150d1a324c..1fa33ec39d 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -39,7 +39,8 @@ impl MetricEngineInner { .collect::>(); // sort columns on column id to ensure the order - logical_column_metadata.sort_unstable_by_key(|col| col.column_id); + logical_column_metadata + .sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); Ok(logical_column_metadata) } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index aa4f35472c..b256894729 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -19,6 +19,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; +use store_api::region_request::RegionRequest; use store_api::storage::RegionId; #[derive(Snafu)] @@ -71,6 +72,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to serialize column metadata"))] + SerializeColumnMetadata { + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, + #[snafu(display("Failed to decode base64 column value"))] DecodeColumnValue { #[snafu(source)] @@ -155,6 +163,12 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display("Unsupported region request: {}", request))] + UnsupportedRegionRequest { + request: RegionRequest, + location: Location, + }, } pub type Result = std::result::Result; @@ -170,11 +184,14 @@ impl ErrorExt for Error { | ColumnTypeMismatch { .. } | PhysicalRegionBusy { .. } => StatusCode::InvalidArguments, - ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported, + ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } => { + StatusCode::Unsupported + } MissingInternalColumn { .. } | DeserializeSemanticType { .. } | DeserializeColumnMetadata { .. } + | SerializeColumnMetadata { .. } | DecodeColumnValue { .. } | ParseRegionId { .. } | InvalidMetadata { .. } => StatusCode::Unexpected, diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 918f7f1023..73ad6e9b32 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -167,7 +167,7 @@ impl MetadataRegion { // TODO(ruihang): avoid using `get_all` /// Get all the columns of a given logical region. - /// Return a list of (column_name, semantic_type). + /// Return a list of (column_name, column_metadata). pub async fn logical_columns( &self, physical_region_id: RegionId, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 16a3bca5dc..449f87dcf4 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -57,7 +57,7 @@ use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionHandleResult, RegionRole, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::oneshot; @@ -290,10 +290,11 @@ impl RegionEngine for MitoEngine { &self, region_id: RegionId, request: RegionRequest, - ) -> Result { + ) -> Result { self.inner .handle_request(region_id, request) .await + .map(RegionHandleResult::new) .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index ca6f0c173e..78cdb28545 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -111,7 +111,7 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; - let rows = engine + let result = engine .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { @@ -123,7 +123,7 @@ async fn test_region_replay() { ) .await .unwrap(); - assert_eq!(0, rows); + assert_eq!(0, result.affected_rows); let request = ScanRequest::default(); let stream = engine.handle_query(region_id, request).await.unwrap(); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 42aba40cc0..71b859cced 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -42,7 +42,7 @@ async fn put_and_flush( }; put_rows(engine, region_id, rows).await; - let rows = engine + let result = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -51,7 +51,7 @@ async fn put_and_flush( ) .await .unwrap(); - assert_eq!(0, rows); + assert_eq!(0, result.affected_rows); } async fn delete_and_flush( @@ -66,16 +66,16 @@ async fn delete_and_flush( rows: build_rows_for_key("a", rows.start, rows.end, 0), }; - let rows_affected = engine + let result = engine .handle_request( region_id, RegionRequest::Delete(RegionDeleteRequest { rows }), ) .await .unwrap(); - assert_eq!(row_cnt, rows_affected); + assert_eq!(row_cnt, result.affected_rows); - let rows = engine + let result = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { @@ -84,7 +84,7 @@ async fn delete_and_flush( ) .await .unwrap(); - assert_eq!(0, rows); + assert_eq!(0, result.affected_rows); } async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { @@ -127,11 +127,11 @@ async fn test_compaction_region() { delete_and_flush(&engine, region_id, &column_schemas, 15..30).await; put_and_flush(&engine, region_id, &column_schemas, 15..25).await; - let output = engine + let result = engine .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) .await .unwrap(); - assert_eq!(output, 0); + assert_eq!(result.affected_rows, 0); let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap(); assert_eq!( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 7339949490..9baa736493 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -712,11 +712,11 @@ pub fn delete_rows_schema(request: &RegionCreateRequest) -> Vec Vec) { - let rows = engine + let result = engine .handle_request( region_id, RegionRequest::Flush(RegionFlushRequest { row_group_size }), ) .await .unwrap(); - assert_eq!(0, rows); + assert_eq!(0, result.affected_rows); } /// Reopen a region. diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 2a4737a79a..60e44cb7c6 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -144,7 +144,10 @@ impl Deleter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; + let affected_rows = results + .into_iter() + .map(|resp| resp.map(|r| r.affected_rows)) + .sum::>()?; crate::metrics::DIST_DELETE_ROW_COUNT.inc_by(affected_rows as u64); Ok(affected_rows) } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 46fe574c38..526c540d2e 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -216,7 +216,10 @@ impl Inserter { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; + let affected_rows = results + .into_iter() + .map(|resp| resp.map(|r| r.affected_rows)) + .sum::>()?; crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64); Ok(Output::new( OutputData::AffectedRows(affected_rows), diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index a300da034c..3aba4dfd70 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -181,7 +181,10 @@ impl Requester { }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - let affected_rows = results.into_iter().sum::>()?; + let affected_rows = results + .into_iter() + .map(|resp| resp.map(|r| r.affected_rows)) + .sum::>()?; Ok(affected_rows) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 6500b99c29..ea66296462 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -48,6 +48,7 @@ use sql::statements::alter::AlterTable; use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; use sql::statements::sql_value_to_value; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; +use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterKind, AlterTableRequest, TableOptions}; @@ -138,6 +139,22 @@ impl StatementExecutor { partitions: Option, query_ctx: &QueryContextRef, ) -> Result { + // Check if is creating logical table + if create_table.engine == METRIC_ENGINE_NAME + && create_table + .table_options + .contains_key(LOGICAL_TABLE_METADATA_KEY) + { + return self + .create_logical_tables(&[create_table.clone()]) + .await? + .into_iter() + .next() + .context(error::UnexpectedSnafu { + violated: "expected to create a logical table", + }); + } + let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer(); let schema = self .table_metadata_manager diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 0ca9109514..1c9c970011 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -95,6 +95,16 @@ impl ColumnMetadata { column_id, }) } + + /// Encodes a vector of `ColumnMetadata` into a JSON byte vector. + pub fn encode_list(columns: &[Self]) -> serde_json::Result> { + serde_json::to_vec(columns) + } + + /// Decodes a JSON byte vector into a vector of `ColumnMetadata`. + pub fn decode_list(bytes: &[u8]) -> serde_json::Result> { + serde_json::from_slice(bytes) + } } #[cfg_attr(doc, aquamarine::aquamarine)] diff --git a/src/store-api/src/metric_engine_consts.rs b/src/store-api/src/metric_engine_consts.rs index d28e4d538a..1f0343f1b0 100644 --- a/src/store-api/src/metric_engine_consts.rs +++ b/src/store-api/src/metric_engine_consts.rs @@ -66,3 +66,7 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table"; /// ``` /// And this key will be translated to corresponding physical **REGION** id in metasrv. pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; + +/// HashMap key to be used in the region server's extension response. +/// Represent a list of column metadata that are added to physical table. +pub const ALTER_PHYSICAL_EXTENSION_KEY: &str = "ALTER_PHYSICAL"; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3b9052a16a..8ec0d4a611 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -15,6 +15,7 @@ //! Region Engine's definition use std::any::Any; +use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; @@ -129,7 +130,7 @@ pub trait RegionEngine: Send + Sync { &self, region_id: RegionId, request: RegionRequest, - ) -> Result; + ) -> Result; /// Handles substrait query and return a stream of record batches async fn handle_query( @@ -171,3 +172,20 @@ pub trait RegionEngine: Send + Sync { } pub type RegionEngineRef = Arc; + +// TODO: reorganize the dependence to merge this struct with the +// one in common_meta +#[derive(Debug)] +pub struct RegionHandleResult { + pub affected_rows: AffectedRows, + pub extension: HashMap>, +} + +impl RegionHandleResult { + pub fn new(affected_rows: AffectedRows) -> Self { + Self { + affected_rows, + extension: Default::default(), + } + } +} diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 628b7ce2c2..064ddbe5b7 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -567,13 +567,19 @@ impl From for TableIdent { #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct RawTableMeta { pub schema: RawSchema, + /// The indices of columns in primary key. Note that the index of timestamp column + /// is not included. Order matters to this array. pub primary_key_indices: Vec, + /// The indices of columns in value. Order doesn't matter to this array. pub value_indices: Vec, + /// Engine type of this table. Usually in small case. pub engine: String, + /// Deprecated. See https://github.com/GreptimeTeam/greptimedb/issues/2982 pub next_column_id: ColumnId, pub region_numbers: Vec, pub options: TableOptions, pub created_on: DateTime, + /// Order doesn't matter to this array. #[serde(default)] pub partition_key_indices: Vec, } diff --git a/tests-integration/src/prom_store.rs b/tests-integration/src/prom_store.rs index 73502c4f74..dd6d1b9b10 100644 --- a/tests-integration/src/prom_store.rs +++ b/tests-integration/src/prom_store.rs @@ -195,14 +195,14 @@ mod tests { name: prom_store::METRIC_NAME_LABEL.to_string(), value: "metric3".to_string(), }, - Label { - name: "idc".to_string(), - value: "z002".to_string(), - }, Label { name: "app".to_string(), value: "biz".to_string(), }, + Label { + name: "idc".to_string(), + value: "z002".to_string(), + }, ], timeseries.labels ); diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result index 31a095c95a..3d4cb611cd 100644 --- a/tests/cases/standalone/common/create/create_metric_table.result +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -38,9 +38,9 @@ DESC TABLE t1; +--------+----------------------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | +--------+----------------------+-----+------+---------+---------------+ +| host | String | PRI | YES | | TAG | | ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | | val | Float64 | | YES | | FIELD | -| host | String | PRI | YES | | TAG | +--------+----------------------+-----+------+---------+---------------+ DESC TABLE t2; @@ -48,8 +48,8 @@ DESC TABLE t2; +--------+----------------------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | +--------+----------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | | job | String | PRI | YES | | TAG | +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | | val | Float64 | | YES | | FIELD | +--------+----------------------+-----+------+---------+---------------+ diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index 09ddac341c..390773c80f 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -6,18 +6,18 @@ CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) e Affected Rows: 0 -INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2'); +INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,); Affected Rows: 2 SELECT * from t1; -+-------------------------+-----+-------+ -| ts | val | host | -+-------------------------+-----+-------+ -| 1970-01-01T00:00:00.001 | 1.0 | host2 | -| 1970-01-01T00:00:00 | 0.0 | host1 | -+-------------------------+-----+-------+ ++-------+-------------------------+-----+ +| host | ts | val | ++-------+-------------------------+-----+ +| host2 | 1970-01-01T00:00:00.001 | 1.0 | +| host1 | 1970-01-01T00:00:00 | 0.0 | ++-------+-------------------------+-----+ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); @@ -28,18 +28,18 @@ SELECT * from t2; ++ ++ -INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1); +INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1); Affected Rows: 2 SELECT * from t2; -+-------------------------+------+-----+ -| ts | job | val | -+-------------------------+------+-----+ -| 1970-01-01T00:00:00.001 | job2 | 1.0 | -| 1970-01-01T00:00:00 | job1 | 0.0 | -+-------------------------+------+-----+ ++------+-------------------------+-----+ +| job | ts | val | ++------+-------------------------+-----+ +| job2 | 1970-01-01T00:00:00.001 | 1.0 | +| job1 | 1970-01-01T00:00:00 | 0.0 | ++------+-------------------------+-----+ DROP TABLE t1; diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index fa2b6e0b6d..a7ff6adbdf 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -2,7 +2,7 @@ CREATE TABLE phy (ts timestamp time index, val double) engine=metric with ("phys CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "phy"); -INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2'); +INSERT INTO t1 VALUES ('host1',0, 0), ('host2', 1, 1,); SELECT * from t1; @@ -10,7 +10,7 @@ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) en SELECT * from t2; -INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1); +INSERT INTO t2 VALUES ('job1', 0, 0), ('job2', 1, 1); SELECT * from t2;