From 992c7ec71badf803ebfc608d9fe96e1b817a7e43 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 25 Mar 2024 11:19:30 +0800 Subject: [PATCH] feat: update physical table's schema on creating logical table (#3570) * feat: update physical table's schema on creating logical table Signed-off-by: Ruihang Xia * remove debug code Signed-off-by: Ruihang Xia * update sqlness cases Signed-off-by: Ruihang Xia * tweak ut const Signed-off-by: Ruihang Xia * update sqlness cases Signed-off-by: Ruihang Xia * invalid physical table cache Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../meta/src/ddl/create_logical_tables.rs | 134 ++++++++++++++++-- src/datanode/src/region_server.rs | 4 +- src/meta-srv/src/procedure/tests.rs | 2 +- src/metric-engine/src/engine.rs | 1 - .../common/create/create_metric_table.result | 27 ++-- .../common/create/create_metric_table.sql | 3 +- .../common/insert/logical_metric_table.result | 24 ++++ .../common/insert/logical_metric_table.sql | 4 + tests/runner/src/env.rs | 2 + 9 files changed, 179 insertions(+), 22 deletions(-) diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 35a32142e4..0c7dd792ef 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -12,30 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::Deref; use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{CreateRequests, RegionRequest, RegionRequestHeader}; -use api::v1::CreateTableExpr; +use api::v1::{CreateTableExpr, SemanticType}; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; -use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{info, warn}; use futures_util::future::join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metadata::ColumnMetadata; +use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; +use crate::cache_invalidator::Context; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; use crate::ddl::DdlContext; -use crate::error::{Result, TableAlreadyExistsSnafu}; +use crate::error::{ + DecodeJsonSnafu, MetadataCorruptionSnafu, Result, TableAlreadyExistsSnafu, + TableInfoNotFoundSnafu, +}; +use crate::instruction::CacheIdent; +use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; @@ -169,11 +179,12 @@ impl CreateLogicalTablesProcedure { self.create_regions(region_routes).await } - /// Creates table metadata + /// Creates table metadata for logical tables and update corresponding physical + /// table's metadata. /// /// Abort(not-retry): /// - Failed to create table metadata. - pub async fn on_create_metadata(&self) -> Result { + pub async fn on_create_metadata(&mut self) -> Result { let manager = &self.context.table_metadata_manager; let physical_table_id = self.creator.data.physical_table_id(); let remaining_tasks = self.creator.data.remaining_tasks(); @@ -208,6 +219,42 @@ impl CreateLogicalTablesProcedure { .map(|task| task.table_info.ident.table_id) .collect::>(); + if !self.creator.data.physical_columns.is_empty() { + // fetch old physical table's info + let physical_table_info = self + .context + .table_metadata_manager + .get_full_table_info(self.creator.data.physical_table_id) + .await? + .0 + .context(TableInfoNotFoundSnafu { + table_name: format!("table id - {}", self.creator.data.physical_table_id), + })?; + + // generate new table info + let new_table_info = self + .creator + .data + .build_new_physical_table_info(&physical_table_info); + + // update physical table's metadata + self.context + .table_metadata_manager + .update_table_info(physical_table_info, new_table_info) + .await?; + + // invalid table cache + self.context + .cache_invalidator + .invalidate( + &Context::default(), + vec![CacheIdent::TableId(self.creator.data.physical_table_id)], + ) + .await?; + } else { + warn!("No physical columns found, leaving the physical table's schema unchanged"); + } + info!("Created {num_tables} tables {table_ids:?} metadata for physical table {physical_table_id}"); Ok(Status::done_with_output(table_ids)) @@ -275,11 +322,39 @@ impl CreateLogicalTablesProcedure { }); } - join_all(create_region_tasks) + // collect response from datanodes + let raw_schemas = join_all(create_region_tasks) .await .into_iter() + .map(|response| { + response.map(|mut response| response.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)) + }) .collect::>>()?; + if raw_schemas.is_empty() { + self.creator.data.state = CreateTablesState::CreateMetadata; + return Ok(Status::executing(false)); + } + + // verify all datanodes return the same raw schemas + // Safety: previous check ensures this vector is not empty. + let first = raw_schemas.first().unwrap(); + ensure!( + raw_schemas.iter().all(|x| x == first), + MetadataCorruptionSnafu { + err_msg: "Raw schemas from datanodes are not the same" + } + ); + + // decode raw schemas and store it + if let Some(raw_schema) = first { + let physical_columns = + ColumnMetadata::decode_list(raw_schema).context(DecodeJsonSnafu)?; + self.creator.data.physical_columns = physical_columns; + } else { + warn!("creating logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged"); + } + self.creator.data.state = CreateTablesState::CreateMetadata; // Ensures the procedures after the crash start from the `DatanodeCreateRegions` stage. @@ -357,6 +432,7 @@ impl TablesCreator { table_ids_already_exists: vec![None; len], physical_table_id, physical_region_numbers: vec![], + physical_columns: vec![], }, } } @@ -370,6 +446,7 @@ pub struct CreateTablesData { table_ids_already_exists: Vec>, physical_table_id: TableId, physical_region_numbers: Vec, + physical_columns: Vec, } impl CreateTablesData { @@ -420,6 +497,47 @@ impl CreateTablesData { }) .collect::>() } + + /// Generate the new physical table info. + /// + /// This method will consumes the physical columns. + fn build_new_physical_table_info( + &mut self, + old_table_info: &DeserializedValueWithBytes, + ) -> RawTableInfo { + let mut raw_table_info = old_table_info.deref().table_info.clone(); + + let existing_primary_key = raw_table_info + .meta + .schema + .column_schemas + .iter() + .map(|col| col.name.clone()) + .collect::>(); + let primary_key_indices = &mut raw_table_info.meta.primary_key_indices; + let value_indices = &mut raw_table_info.meta.value_indices; + value_indices.clear(); + let time_index = &mut raw_table_info.meta.schema.timestamp_index; + let columns = &mut raw_table_info.meta.schema.column_schemas; + columns.clear(); + + for (idx, col) in self.physical_columns.drain(..).enumerate() { + match col.semantic_type { + SemanticType::Tag => { + // push new primary key to the end. + if !existing_primary_key.contains(&col.column_schema.name) { + primary_key_indices.push(idx); + } + } + SemanticType::Field => value_indices.push(idx), + SemanticType::Timestamp => *time_index = Some(idx), + } + + columns.push(col.column_schema); + } + + raw_table_info + } } #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr)] diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 917fba197d..0dc9845850 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -270,8 +270,10 @@ impl RegionServerHandler for RegionServer { // merge results by sum up affected rows and merge extensions. let mut affected_rows = 0; + let mut extension = HashMap::new(); for result in results { affected_rows += result.affected_rows; + extension.extend(result.extension); } Ok(RegionResponse { @@ -282,7 +284,7 @@ impl RegionServerHandler for RegionServer { }), }), affected_rows: affected_rows as _, - extension: Default::default(), + extension, }) } } diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 84ab373dc9..03648168e0 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -252,7 +252,7 @@ async fn test_on_datanode_create_logical_regions() { let region_routes = test_data::new_region_routes(); let datanode_manager = new_datanode_manager(®ion_server, ®ion_routes).await; let physical_table_route = TableRouteValue::physical(region_routes); - let physical_table_id = 111; + let physical_table_id = 1; let task1 = create_table_task(Some("my_table1")); let task2 = create_table_task(Some("my_table2")); diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index ab6991a13c..41f14fd766 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -148,7 +148,6 @@ impl RegionEngine for MetricEngine { RegionRequest::Catchup(_) => Ok(0), }; - // TODO: pass extension result .map_err(BoxedError::new) .map(|rows| RegionHandleResult { diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result index 3d4cb611cd..b6578c5ca9 100644 --- a/tests/cases/standalone/common/create/create_metric_table.result +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -19,19 +19,28 @@ CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) en Affected Rows: 0 -SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name; +SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name; -Error: 3000(PlanQuery), Failed to plan SQL: No field named metric. Valid fields are information_schema.tables.table_catalog, information_schema.tables.table_schema, information_schema.tables.table_name, information_schema.tables.table_type, information_schema.tables.table_id, information_schema.tables.engine. ++---------------+--------------+------------+------------+--------+ +| table_catalog | table_schema | table_name | table_type | engine | ++---------------+--------------+------------+------------+--------+ +| greptime | public | phy | BASE TABLE | metric | +| greptime | public | t1 | BASE TABLE | metric | +| greptime | public | t2 | BASE TABLE | metric | ++---------------+--------------+------------+------------+--------+ --- We currently don't maintains physical table's schema. DESC TABLE phy; -+--------+----------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+----------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| val | Float64 | | YES | | FIELD | -+--------+----------------------+-----+------+---------+---------------+ ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | | NO | | FIELD | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ DESC TABLE t1; diff --git a/tests/cases/standalone/common/create/create_metric_table.sql b/tests/cases/standalone/common/create/create_metric_table.sql index 3caf427d5a..28b3083d90 100644 --- a/tests/cases/standalone/common/create/create_metric_table.sql +++ b/tests/cases/standalone/common/create/create_metric_table.sql @@ -6,9 +6,8 @@ CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) e CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); -SELECT * FROM information_schema.tables WHERE engine = "metric" order by table_name; +SELECT table_catalog, table_schema, table_name, table_type, engine FROM information_schema.tables WHERE engine = 'metric' order by table_name; --- We currently don't maintains physical table's schema. DESC TABLE phy; DESC TABLE t1; diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index 390773c80f..a6e958d4a0 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -49,6 +49,30 @@ DROP TABLE t2; Affected Rows: 0 +DESC TABLE phy; + ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | | NO | | FIELD | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ + +SELECT ts, val, __tsid, host, job FROM phy; + ++-------------------------+-----+----------------------+-------+------+ +| ts | val | __tsid | host | job | ++-------------------------+-----+----------------------+-------+------+ +| 1970-01-01T00:00:00.001 | 1.0 | 1128149335081630826 | host2 | | +| 1970-01-01T00:00:00 | 0.0 | 18067404594631612786 | host1 | | +| 1970-01-01T00:00:00.001 | 1.0 | 2176048834144407834 | | job2 | +| 1970-01-01T00:00:00 | 0.0 | 15980333303142110493 | | job1 | ++-------------------------+-----+----------------------+-------+------+ + DROP TABLE phy; Affected Rows: 0 diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index a7ff6adbdf..2157d68707 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -18,4 +18,8 @@ DROP TABLE t1; DROP TABLE t2; +DESC TABLE phy; + +SELECT ts, val, __tsid, host, job FROM phy; + DROP TABLE phy; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 0edf7e471e..ea3e3e1bc1 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -199,6 +199,8 @@ impl Env { }; let log_file_name = self.data_home.join(log_file_name).display().to_string(); + println!("{subcommand} log file at {log_file_name}"); + let log_file = OpenOptions::new() .create(true) .write(true)