From 175fddb3b5cfd3ddf320a3ce38d1192d2bb25ce4 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 13 Nov 2024 15:02:35 +0800 Subject: [PATCH] fix: alter table add column id alloc mismatch (#4972) * fix: alter table add column id alloc mismatch * chore: remove debug code * chore: typos * chore: error variant * chore: more checks for invariant * refactor: better check&explain * fix: exist column metadata correct * chore: remove unused error variant --- src/metric-engine/src/data_region.rs | 4 +- src/metric-engine/src/engine/alter.rs | 40 +++++--- src/metric-engine/src/engine/create.rs | 61 +++++++++--- src/metric-engine/src/metadata_region.rs | 2 + .../common/alter/alter_table.result | 99 +++++++++++++++++++ .../standalone/common/alter/alter_table.sql | 62 ++++++++++++ 6 files changed, 237 insertions(+), 31 deletions(-) diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 0ed19db600..87db153631 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -15,8 +15,7 @@ use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_telemetry::info; -use common_telemetry::tracing::warn; +use common_telemetry::{debug, info, warn}; use mito2::engine::MitoEngine; use snafu::ResultExt; use store_api::metadata::ColumnMetadata; @@ -150,6 +149,7 @@ impl DataRegion { }) .collect::>()?; + debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}"); // assemble alter request let alter_request = RegionRequest::Alter(RegionAlterRequest { schema_version: version, diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 76066ab97a..f4a4811f72 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -92,17 +92,29 @@ impl MetricEngineInner { let metadata_region_id = to_metadata_region_id(physical_region_id); let mut columns_to_add = vec![]; + // columns that already exist in physical region + let mut existing_columns = vec![]; + + let pre_existing_physical_columns = self + .data_region + .physical_columns(physical_region_id) + .await?; + + let pre_exist_cols = pre_existing_physical_columns + .iter() + .map(|col| (col.column_schema.name.as_str(), col)) + .collect::>(); + + // check pre-existing physical columns so if any columns to add is already exist, + // we can skip it in physical alter operation + // (but still need to update them in logical alter operation) for col in &columns { - if self - .metadata_region - .column_semantic_type( - metadata_region_id, - logical_region_id, - &col.column_metadata.column_schema.name, - ) - .await? - .is_none() + if let Some(exist_column) = + pre_exist_cols.get(&col.column_metadata.column_schema.name.as_str()) { + // push the correct column schema with correct column id + existing_columns.push(*exist_column); + } else { columns_to_add.push(col.column_metadata.clone()); } } @@ -111,16 +123,16 @@ impl MetricEngineInner { let data_region_id = to_data_region_id(physical_region_id); self.add_columns_to_physical_data_region( data_region_id, - metadata_region_id, logical_region_id, - columns_to_add, + &mut columns_to_add, ) .await?; - // register columns to logical region - for col in columns { + // note here we don't use `columns` directly but concat `existing_columns` with `columns_to_add` to get correct metadata + // about already existing columns + for metadata in existing_columns.into_iter().chain(columns_to_add.iter()) { self.metadata_region - .add_column(metadata_region_id, logical_region_id, &col.column_metadata) + .add_column(metadata_region_id, logical_region_id, metadata) .await?; } diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 9ca89248dc..b90d490058 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_telemetry::info; +use common_telemetry::{info, warn}; use common_time::Timestamp; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -212,11 +212,17 @@ impl MetricEngineInner { self.add_columns_to_physical_data_region( data_region_id, - metadata_region_id, logical_region_id, - new_columns, + &mut new_columns, ) .await?; + + // register columns to metadata region + for col in &new_columns { + self.metadata_region + .add_column(metadata_region_id, logical_region_id, col) + .await?; + } } // register logical region to metadata region @@ -260,27 +266,24 @@ impl MetricEngineInner { Ok(data_region_id) } - /// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be - /// cloned into `added_columns`. + /// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id + /// which should be correct if the following requirements are met: + /// + /// # NOTE + /// + /// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns. + /// pub(crate) async fn add_columns_to_physical_data_region( &self, data_region_id: RegionId, - metadata_region_id: RegionId, logical_region_id: RegionId, - mut new_columns: Vec, + new_columns: &mut [ColumnMetadata], ) -> Result<()> { // alter data region self.data_region - .add_columns(data_region_id, &mut new_columns) + .add_columns(data_region_id, new_columns) .await?; - // register columns to metadata region - for col in &new_columns { - self.metadata_region - .add_column(metadata_region_id, logical_region_id, col) - .await?; - } - // safety: previous step has checked this self.state.write().unwrap().add_physical_columns( data_region_id, @@ -291,6 +294,34 @@ impl MetricEngineInner { info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); + // correct the column id + let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?; + let after_alter_physical_schema_map = after_alter_physical_schema + .iter() + .map(|metadata| (metadata.column_schema.name.as_str(), metadata)) + .collect::>(); + + // double check to make sure column ids are not mismatched + // shouldn't be a expensive operation, given it only query for physical columns + for col in new_columns.iter_mut() { + let column_metadata = after_alter_physical_schema_map + .get(&col.column_schema.name.as_str()) + .with_context(|| ColumnNotFoundSnafu { + name: &col.column_schema.name, + region_id: data_region_id, + })?; + if col != *column_metadata { + warn!( + "Add already existing columns with different column metadata to physical region({:?}): new column={:?}, old column={:?}", + data_region_id, + col, + column_metadata + ); + // update to correct metadata + *col = (*column_metadata).clone(); + } + } + Ok(()) } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index b02fa3de51..e440eb1765 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -211,6 +211,7 @@ impl MetadataRegion { } /// Check if the given column exists. Return the semantic type if exists. + #[cfg(test)] pub async fn column_semantic_type( &self, physical_region_id: RegionId, @@ -373,6 +374,7 @@ impl MetadataRegion { /// Retrieves the value associated with the given key in the specified region. /// Returns `Ok(None)` if the key is not found. + #[cfg(test)] pub async fn get(&self, region_id: RegionId, key: &str) -> Result> { let scan_req = Self::build_read_request(key); let record_batch_stream = self diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index bd503fe866..120e7695d0 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -71,3 +71,102 @@ DROP TABLE test_alt_table; Affected Rows: 0 +-- to test if same name column can be added +CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t1 ( + ts timestamp time index, + val double, + host string primary key +) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +INSERT INTO + t1 +VALUES + ('host1', 0, 1), + ('host2', 1, 0,); + +Affected Rows: 2 + +SELECT + * +FROM + t1; + ++-------+-------------------------+-----+ +| host | ts | val | ++-------+-------------------------+-----+ +| host2 | 1970-01-01T00:00:00.001 | 0.0 | +| host1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------------------------+-----+ + +CREATE TABLE t2 ( + ts timestamp time index, + job string primary key, + val double +) engine = metric with ("on_physical_table" = "phy"); + +Affected Rows: 0 + +ALTER TABLE + t1 +ADD + COLUMN `at` STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN at3 STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN `at` STRING; + +Affected Rows: 0 + +ALTER TABLE + t2 +ADD + COLUMN at2 STRING; + +Affected Rows: 0 + +INSERT INTO + t2 +VALUES + ("loc_1", "loc_2", "loc_3", 'job1', 0, 1); + +Affected Rows: 1 + +SELECT + * +FROM + t2; + ++-------+-------+-------+------+---------------------+-----+ +| at | at2 | at3 | job | ts | val | ++-------+-------+-------+------+---------------------+-----+ +| loc_1 | loc_2 | loc_3 | job1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------+-------+------+---------------------+-----+ + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + +DROP TABLE phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index d77e66cc45..7f3e0b6640 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -20,3 +20,65 @@ ALTER TABLE test_alt_table ADD COLUMN m INTEGER; DESC TABLE test_alt_table; DROP TABLE test_alt_table; + +-- to test if same name column can be added +CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = ""); + +CREATE TABLE t1 ( + ts timestamp time index, + val double, + host string primary key +) engine = metric with ("on_physical_table" = "phy"); + +INSERT INTO + t1 +VALUES + ('host1', 0, 1), + ('host2', 1, 0,); + +SELECT + * +FROM + t1; + +CREATE TABLE t2 ( + ts timestamp time index, + job string primary key, + val double +) engine = metric with ("on_physical_table" = "phy"); + +ALTER TABLE + t1 +ADD + COLUMN `at` STRING; + +ALTER TABLE + t2 +ADD + COLUMN at3 STRING; + +ALTER TABLE + t2 +ADD + COLUMN `at` STRING; + +ALTER TABLE + t2 +ADD + COLUMN at2 STRING; + +INSERT INTO + t2 +VALUES + ("loc_1", "loc_2", "loc_3", 'job1', 0, 1); + +SELECT + * +FROM + t2; + +DROP TABLE t1; + +DROP TABLE t2; + +DROP TABLE phy;