feat: add all partition column to logical table automatically (#6711)

* feat: add all partition column to logical table automatically

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify builder

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix request builder

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: update sqlness

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Ruihang Xia
2025-08-12 10:25:24 -07:00
committed by GitHub
parent ff5d672583
commit 9ad9a7d2bc
4 changed files with 186 additions and 59 deletions

View File

@@ -68,6 +68,7 @@ impl CreateLogicalTablesProcedure {
physical_table_id,
physical_region_numbers: vec![],
physical_columns: vec![],
physical_partition_columns: vec![],
},
}
}
@@ -91,6 +92,8 @@ impl CreateLogicalTablesProcedure {
self.check_input_tasks()?;
// Sets physical region numbers
self.fill_physical_table_info().await?;
// Add partition columns from physical table to logical table schemas
self.merge_partition_columns_into_logical_tables()?;
// Checks if the tables exist
self.check_tables_already_exist().await?;
@@ -257,6 +260,7 @@ pub struct CreateTablesData {
physical_table_id: TableId,
physical_region_numbers: Vec<RegionNumber>,
physical_columns: Vec<ColumnMetadata>,
physical_partition_columns: Vec<String>,
}
impl CreateTablesData {

View File

@@ -12,6 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::OptionExt;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
@@ -28,6 +34,89 @@ impl CreateLogicalTablesProcedure {
self.data.physical_region_numbers = physical_region_numbers;
// Extract partition column names from the physical table
let physical_table_info = self
.context
.table_metadata_manager
.table_info_manager()
.get(self.data.physical_table_id)
.await?
.with_context(|| crate::error::TableInfoNotFoundSnafu {
table: format!("physical table {}", self.data.physical_table_id),
})?;
let physical_partition_columns: Vec<String> = physical_table_info
.table_info
.meta
.partition_key_indices
.iter()
.map(|&idx| {
physical_table_info.table_info.meta.schema.column_schemas[idx]
.name
.clone()
})
.collect();
self.data.physical_partition_columns = physical_partition_columns;
Ok(())
}
pub(crate) fn merge_partition_columns_into_logical_tables(&mut self) -> Result<()> {
let partition_columns = &self.data.physical_partition_columns;
// Skip if no partition columns to add
if partition_columns.is_empty() {
return Ok(());
}
for task in &mut self.data.tasks {
// Get existing column names in the logical table
let existing_column_names: HashSet<_> = task
.table_info
.meta
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect();
let mut new_columns = Vec::new();
let mut new_primary_key_indices = task.table_info.meta.primary_key_indices.clone();
// Add missing partition columns
for partition_column in partition_columns {
if !existing_column_names.contains(partition_column) {
let new_column_index =
task.table_info.meta.schema.column_schemas.len() + new_columns.len();
// Create new column schema for the partition column
let column_schema = ColumnSchema::new(
partition_column.clone(),
ConcreteDataType::string_datatype(),
true,
);
new_columns.push(column_schema);
// Add to primary key indices (partition columns are part of primary key)
new_primary_key_indices.push(new_column_index);
}
}
// If we added new columns, update the table info
if !new_columns.is_empty() {
let mut updated_columns = task.table_info.meta.schema.column_schemas.clone();
updated_columns.extend(new_columns);
// Create new schema with updated columns
let new_schema = RawSchema::new(updated_columns);
// Update the table info
task.table_info.meta.schema = new_schema;
task.table_info.meta.primary_key_indices = new_primary_key_indices;
}
}
Ok(())
}

View File

@@ -19,9 +19,12 @@ use api::v1::CreateTableExpr;
use common_telemetry::debug;
use common_telemetry::tracing_context::TracingContext;
use store_api::storage::{RegionId, TableId};
use table::metadata::RawTableInfo;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::create_table_template::{
build_template, build_template_from_raw_table_info, CreateRequestBuilder,
};
use crate::ddl::utils::region_storage_path;
use crate::error::Result;
use crate::peer::Peer;
@@ -47,8 +50,10 @@ impl CreateLogicalTablesProcedure {
let logical_table_id = task.table_info.ident.table_id;
let physical_table_id = self.data.physical_table_id;
let storage_path = region_storage_path(catalog, schema);
let request_builder =
create_region_request_builder(&task.create_table, physical_table_id)?;
let request_builder = create_region_request_builder_from_raw_table_info(
&task.table_info,
physical_table_id,
)?;
for region_number in &regions_on_this_peer {
let region_id = RegionId::new(logical_table_id, *region_number);
@@ -73,7 +78,7 @@ impl CreateLogicalTablesProcedure {
}
}
/// Creates a region request builder.
/// Creates a region request builder
pub fn create_region_request_builder(
create_table_expr: &CreateTableExpr,
physical_table_id: TableId,
@@ -81,3 +86,14 @@ pub fn create_region_request_builder(
let template = build_template(create_table_expr)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}
/// Builds a [CreateRequestBuilder] from a [RawTableInfo].
///
/// Note: **This method is only used for creating logical tables.**
pub fn create_region_request_builder_from_raw_table_info(
raw_table_info: &RawTableInfo,
physical_table_id: TableId,
) -> Result<CreateRequestBuilder> {
let template = build_template_from_raw_table_info(raw_table_info)?;
Ok(CreateRequestBuilder::new(template, Some(physical_table_id)))
}

View File

@@ -60,26 +60,28 @@ Affected Rows: 3
show create table logical_table_2;
+-----------------+-------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------+
| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( |
| | "cpu" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host <= 'host1', |
| | host > 'host1' AND host <= 'host2', |
| | host > 'host2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------+
+-----------------+-------------------------------------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------------------------------------+
| logical_table_2 | CREATE TABLE IF NOT EXISTS "logical_table_2" ( |
| | "another_partition_key" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "one_partition_key" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") |
| | ) |
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
| | host <= 'host1', |
| | host > 'host1' AND host <= 'host2', |
| | host > 'host2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------------------------------------+
select count(*) from logical_table_2;
@@ -111,14 +113,15 @@ select host, count(*) from logical_table_2 GROUP BY host ORDER BY host;
+-+-+
| plan_type_| plan_|
+-+-+
| logical_plan_| MergeSort: logical_table_2.host ASC NULLS LAST_|
| logical_plan_| Sort: logical_table_2.host ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_merge(__count_state(logical_table_2.ts)) AS count(*)]] |
|_|_MergeScan [is_placeholder=false, remote_input=[_|
|_| Sort: logical_table_2.host ASC NULLS LAST_|
|_|_Projection: logical_table_2.host, count(*)_|
|_|_Aggregate: groupBy=[[logical_table_2.host]], aggr=[[count(logical_table_2.ts) AS count(*)]]_|
|_| Aggregate: groupBy=[[logical_table_2.host]], aggr=[[__count_state(logical_table_2.ts)]]_|
|_|_TableScan: logical_table_2_|
|_| ]]_|
| physical_plan | SortPreservingMergeExec: [host@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[host@0 ASC NULLS LAST], preserve_partitioning=[true]_|
|_|_AggregateExec: mode=SinglePartitioned, gby=[host@0 as host], aggr=[count(*)]_|
|_|_MergeScanExec: REDACTED
|_|_|
+-+-+
@@ -177,23 +180,30 @@ Affected Rows: 0
show create table logical_table_3;
+-----------------+-------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------+
| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( |
| | "a" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "z" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a", "z") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------+
+-----------------+--------------------------------------------------------------------------------+
| Table | Create Table |
+-----------------+--------------------------------------------------------------------------------+
| logical_table_3 | CREATE TABLE IF NOT EXISTS "logical_table_3" ( |
| | "a" STRING NULL, |
| | "another_partition_key" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "one_partition_key" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "z" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("a", "another_partition_key", "host", "one_partition_key", "z") |
| | ) |
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
| | host <= 'host1', |
| | host > 'host1' AND host <= 'host2', |
| | host > 'host2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+--------------------------------------------------------------------------------+
insert into logical_table_3(ts, a, z, cpu) values
('2023-01-01 00:00:00', 'a1', 'z1', 1.0),
@@ -262,20 +272,28 @@ Affected Rows: 0
show create table logical_table_4;
+-----------------+-------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------+
| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( |
| | "cpu" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------+
+-----------------+-------------------------------------------------------------------------------+
| Table | Create Table |
+-----------------+-------------------------------------------------------------------------------+
| logical_table_4 | CREATE TABLE IF NOT EXISTS "logical_table_4" ( |
| | "another_partition_key" STRING NULL, |
| | "cpu" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "one_partition_key" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("another_partition_key", "host", "one_partition_key") |
| | ) |
| | PARTITION ON COLUMNS ("host", "one_partition_key", "another_partition_key") ( |
| | host <= 'host1', |
| | host > 'host1' AND host <= 'host2', |
| | host > 'host2' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | on_physical_table = 'metric_engine_partition' |
| | ) |
+-----------------+-------------------------------------------------------------------------------+
insert into logical_table_4(ts, cpu) values
('2023-01-01 00:00:00', 1.0),