diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 3b0a091a14..312251d6b4 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -68,6 +68,7 @@ impl CreateLogicalTablesProcedure { physical_table_id, physical_region_numbers: vec![], physical_columns: vec![], + physical_partition_columns: vec![], }, } } @@ -91,6 +92,8 @@ impl CreateLogicalTablesProcedure { self.check_input_tasks()?; // Sets physical region numbers self.fill_physical_table_info().await?; + // Add partition columns from physical table to logical table schemas + self.merge_partition_columns_into_logical_tables()?; // Checks if the tables exist self.check_tables_already_exist().await?; @@ -257,6 +260,7 @@ pub struct CreateTablesData { physical_table_id: TableId, physical_region_numbers: Vec, physical_columns: Vec, + physical_partition_columns: Vec, } impl CreateTablesData { diff --git a/src/common/meta/src/ddl/create_logical_tables/metadata.rs b/src/common/meta/src/ddl/create_logical_tables/metadata.rs index 89f5c374af..339c3b463b 100644 --- a/src/common/meta/src/ddl/create_logical_tables/metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/metadata.rs @@ -12,6 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, RawSchema}; +use snafu::OptionExt; + use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::error::Result; use crate::key::table_route::TableRouteValue; @@ -28,6 +34,89 @@ impl CreateLogicalTablesProcedure { self.data.physical_region_numbers = physical_region_numbers; + // Extract partition column names from the physical table + let physical_table_info = self + .context + .table_metadata_manager + .table_info_manager() + .get(self.data.physical_table_id) + .await? + .with_context(|| crate::error::TableInfoNotFoundSnafu { + table: format!("physical table {}", self.data.physical_table_id), + })?; + + let physical_partition_columns: Vec = physical_table_info + .table_info + .meta + .partition_key_indices + .iter() + .map(|&idx| { + physical_table_info.table_info.meta.schema.column_schemas[idx] + .name + .clone() + }) + .collect(); + + self.data.physical_partition_columns = physical_partition_columns; + + Ok(()) + } + + pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> { + let partition_columns = &self.data.physical_partition_columns; + + // Skip if no partition columns to add + if partition_columns.is_empty() { + return Ok(()); + } + + for task in &mut self.data.tasks { + // Get existing column names in the logical table + let existing_column_names: HashSet<_> = task + .table_info + .meta + .schema + .column_schemas + .iter() + .map(|c| &c.name) + .collect(); + + let mut new_columns = Vec::new(); + let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone(); + + // Add missing partition columns + for partition_column in partition_columns { + if !existing_column_names.contains(partition_column) { + let new_column_index = + task.table_info.meta.schema.column_schemas.len() + new_columns.len(); + + // Create new column schema for the partition column + let column_schema = ColumnSchema::new( + partition_column.clone(), + ConcreteDataType::string_datatype(), + true, + ); + new_columns.push(column_schema); + + // Add to primary key indices (partition columns are part of primary key) + new_primary_key_indices.push(new_column_index); + } + } + + // If we added new columns, update the table info + if !new_columns.is_empty() { + let mut updated_columns = task.table_info.meta.schema.column_schemas.clone(); + updated_columns.extend(new_columns); + + // Create new schema with updated columns + let new_schema = RawSchema::new(updated_columns); + + // Update the table info + task.table_info.meta.schema = new_schema; + task.table_info.meta.primary_key_indices = new_primary_key_indices; + } + } + Ok(()) } diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index ecf951cd06..de00230d68 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -19,9 +19,12 @@ use api::v1::CreateTableExpr; use common_telemetry::debug; use common_telemetry::tracing_context::TracingContext; use store_api::storage::{RegionId, TableId}; +use table::metadata::RawTableInfo; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; -use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; +use crate::ddl::create_table_template::{ + build_template, build_template_from_raw_table_info, CreateRequestBuilder, +}; use crate::ddl::utils::region_storage_path; use crate::error::Result; use crate::peer::Peer; @@ -47,8 +50,10 @@ impl CreateLogicalTablesProcedure { let logical_table_id = task.table_info.ident.table_id; let physical_table_id = self.data.physical_table_id; let storage_path = region_storage_path(catalog, schema); - let request_builder = - create_region_request_builder(&task.create_table, physical_table_id)?; + let request_builder = create_region_request_builder_from_raw_table_info( + &task.table_info, + physical_table_id, + )?; for region_number in ®ions_on_this_peer { let region_id = RegionId::new(logical_table_id, *region_number); @@ -73,7 +78,7 @@ impl CreateLogicalTablesProcedure { } } -/// Creates a region request builder. +/// Creates a region request builder pub fn create_region_request_builder( create_table_expr: &CreateTableExpr, physical_table_id: TableId, @@ -81,3 +86,14 @@ pub fn create_region_request_builder( let template = build_template(create_table_expr)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } + +/// Builds a [CreateRequestBuilder] from a [RawTableInfo]. +/// +/// Note: **This method is only used for creating logical tables.** +pub fn create_region_request_builder_from_raw_table_info( + raw_table_info: &RawTableInfo, + physical_table_id: TableId, +) -> Result { + let template = build_template_from_raw_table_info(raw_table_info)?; + Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) +} diff --git a/tests/cases/standalone/common/create/metric_engine_partition.result b/tests/cases/standalone/common/create/metric_engine_partition.result index 8dda7887e7..1b7a943aab 100644 --- a/tests/cases/standalone/common/create/metric_engine_partition.result +++ b/tests/cases/standalone/common/create/metric_engine_partition.result @@ -60,26 +60,28 @@ Affected Rows: 3 show create table logical_table_2; -+-----------------+-------------------------------------------------+ -| Table | Create Table | -+-----------------+-------------------------------------------------+ -| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( | -| | "cpu" DOUBLE NULL, | -| | "host" STRING NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("host") | -| | ) | -| | PARTITION ON COLUMNS ("host") ( | -| | host <= 'host1', | -| | host > 'host1' AND host <= 'host2', | -| | host > 'host2' | -| | ) | -| | ENGINE=metric | -| | WITH( | -| | on_physical_table = 'metric_engine_partition' | -| | ) | -+-----------------+-------------------------------------------------+ ++-----------------+-------------------------------------------------------------------------------+ +| Table | Create Table | ++-----------------+-------------------------------------------------------------------------------+ +| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( | +| | "another_partition_key" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "one_partition_key" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") | +| | ) | +| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | +| | host <= 'host1', | +| | host > 'host1' AND host <= 'host2', | +| | host > 'host2' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+-------------------------------------------------------------------------------+ select count(*) from logical_table_2; @@ -111,14 +113,15 @@ select host, count(*) from logical_table_2 GROUP BY host ORDER BY host; +-+-+ | plan_type_| plan_| +-+-+ -| logical_plan_| MergeSort: logical_table_2.host ASC NULLS LAST_| +| logical_plan_| Sort: logical_table_2.host ASC NULLS LAST_| +|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(*)]] | |_|_MergeScan [is_placeholder=false, remote_input=[_| -|_| Sort: logical_table_2.host ASC NULLS LAST_| -|_|_Projection: logical_table_2.host, count(*)_| -|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[count(logical_table_2.ts) AS count(*)]]_| +|_| Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_state(logical_table_2.ts)]]_| |_|_TableScan: logical_table_2_| |_| ]]_| | physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_| +|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_| +|_|_AggregateExec: mode=SinglePartitioned, gby=[host@0 as host], aggr=[count(*)]_| |_|_MergeScanExec: REDACTED |_|_| +-+-+ @@ -177,23 +180,30 @@ Affected Rows: 0 show create table logical_table_3; -+-----------------+-------------------------------------------------+ -| Table | Create Table | -+-----------------+-------------------------------------------------+ -| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( | -| | "a" STRING NULL, | -| | "cpu" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "z" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("a", "z") | -| | ) | -| | | -| | ENGINE=metric | -| | WITH( | -| | on_physical_table = 'metric_engine_partition' | -| | ) | -+-----------------+-------------------------------------------------+ ++-----------------+--------------------------------------------------------------------------------+ +| Table | Create Table | ++-----------------+--------------------------------------------------------------------------------+ +| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( | +| | "a" STRING NULL, | +| | "another_partition_key" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "one_partition_key" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "z" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("a", "another_partition_key", "host", "one_partition_key", "z") | +| | ) | +| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | +| | host <= 'host1', | +| | host > 'host1' AND host <= 'host2', | +| | host > 'host2' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+--------------------------------------------------------------------------------+ insert into logical_table_3(ts, a, z, cpu) values ('2023-01-01 00:00:00', 'a1', 'z1', 1.0), @@ -262,20 +272,28 @@ Affected Rows: 0 show create table logical_table_4; -+-----------------+-------------------------------------------------+ -| Table | Create Table | -+-----------------+-------------------------------------------------+ -| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( | -| | "cpu" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts") | -| | ) | -| | | -| | ENGINE=metric | -| | WITH( | -| | on_physical_table = 'metric_engine_partition' | -| | ) | -+-----------------+-------------------------------------------------+ ++-----------------+-------------------------------------------------------------------------------+ +| Table | Create Table | ++-----------------+-------------------------------------------------------------------------------+ +| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( | +| | "another_partition_key" STRING NULL, | +| | "cpu" DOUBLE NULL, | +| | "host" STRING NULL, | +| | "one_partition_key" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") | +| | ) | +| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( | +| | host <= 'host1', | +| | host > 'host1' AND host <= 'host2', | +| | host > 'host2' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | on_physical_table = 'metric_engine_partition' | +| | ) | ++-----------------+-------------------------------------------------------------------------------+ insert into logical_table_4(ts, cpu) values ('2023-01-01 00:00:00', 1.0),