fix: alter table add column id alloc mismatch (#4972)

* fix: alter table add column id alloc mismatch

* chore: remove debug code

* chore: typos

* chore: error variant

* chore: more checks for invariant

* refactor: better check&explain

* fix: exist column metadata correct

* chore: remove unused error variant
This commit is contained in:
discord9
2024-11-13 15:02:35 +08:00
committed by GitHub
parent 6afc4e778a
commit 175fddb3b5
6 changed files with 237 additions and 31 deletions

View File

@@ -15,8 +15,7 @@
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info, warn};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
@@ -150,6 +149,7 @@ impl DataRegion {
})
.collect::<Result<_>>()?;
debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,

View File

@@ -92,17 +92,29 @@ impl MetricEngineInner {
let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
// columns that already exist in physical region
let mut existing_columns = vec![];
let pre_existing_physical_columns = self
.data_region
.physical_columns(physical_region_id)
.await?;
let pre_exist_cols = pre_existing_physical_columns
.iter()
.map(|col| (col.column_schema.name.as_str(), col))
.collect::<HashMap<_, _>>();
// check pre-existing physical columns so if any columns to add is already exist,
// we can skip it in physical alter operation
// (but still need to update them in logical alter operation)
for col in &columns {
if self
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_region_id,
&col.column_metadata.column_schema.name,
)
.await?
.is_none()
if let Some(exist_column) =
pre_exist_cols.get(&col.column_metadata.column_schema.name.as_str())
{
// push the correct column schema with correct column id
existing_columns.push(*exist_column);
} else {
columns_to_add.push(col.column_metadata.clone());
}
}
@@ -111,16 +123,16 @@ impl MetricEngineInner {
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
columns_to_add,
&mut columns_to_add,
)
.await?;
// register columns to logical region
for col in columns {
// note here we don't use `columns` directly but concat `existing_columns` with `columns_to_add` to get correct metadata
// about already existing columns
for metadata in existing_columns.into_iter().chain(columns_to_add.iter()) {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.add_column(metadata_region_id, logical_region_id, metadata)
.await?;
}

View File

@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_telemetry::info;
use common_telemetry::{info, warn};
use common_time::Timestamp;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -212,11 +212,17 @@ impl MetricEngineInner {
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_region_id,
new_columns,
&mut new_columns,
)
.await?;
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}
}
// register logical region to metadata region
@@ -260,27 +266,24 @@ impl MetricEngineInner {
Ok(data_region_id)
}
/// Execute corresponding alter requests to mito region. New added columns' [ColumnMetadata] will be
/// cloned into `added_columns`.
/// Execute corresponding alter requests to mito region. After calling this, `new_columns` will be assign a new column id
/// which should be correct if the following requirements are met:
///
/// # NOTE
///
/// `new_columns` MUST NOT pre-exist in the physical region. Or the results will be wrong column id for the new columns.
///
pub(crate) async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_region_id: RegionId,
mut new_columns: Vec<ColumnMetadata>,
new_columns: &mut [ColumnMetadata],
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, &mut new_columns)
.add_columns(data_region_id, new_columns)
.await?;
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(metadata_region_id, logical_region_id, col)
.await?;
}
// safety: previous step has checked this
self.state.write().unwrap().add_physical_columns(
data_region_id,
@@ -291,6 +294,34 @@ impl MetricEngineInner {
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
// correct the column id
let after_alter_physical_schema = self.data_region.physical_columns(data_region_id).await?;
let after_alter_physical_schema_map = after_alter_physical_schema
.iter()
.map(|metadata| (metadata.column_schema.name.as_str(), metadata))
.collect::<HashMap<_, _>>();
// double check to make sure column ids are not mismatched
// shouldn't be a expensive operation, given it only query for physical columns
for col in new_columns.iter_mut() {
let column_metadata = after_alter_physical_schema_map
.get(&col.column_schema.name.as_str())
.with_context(|| ColumnNotFoundSnafu {
name: &col.column_schema.name,
region_id: data_region_id,
})?;
if col != *column_metadata {
warn!(
"Add already existing columns with different column metadata to physical region({:?}): new column={:?}, old column={:?}",
data_region_id,
col,
column_metadata
);
// update to correct metadata
*col = (*column_metadata).clone();
}
}
Ok(())
}

View File

@@ -211,6 +211,7 @@ impl MetadataRegion {
}
/// Check if the given column exists. Return the semantic type if exists.
#[cfg(test)]
pub async fn column_semantic_type(
&self,
physical_region_id: RegionId,
@@ -373,6 +374,7 @@ impl MetadataRegion {
/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
#[cfg(test)]
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self

View File

@@ -71,3 +71,102 @@ DROP TABLE test_alt_table;
Affected Rows: 0
-- to test if same name column can be added
CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE t1 (
ts timestamp time index,
val double,
host string primary key
) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
INSERT INTO
t1
VALUES
('host1', 0, 1),
('host2', 1, 0,);
Affected Rows: 2
SELECT
*
FROM
t1;
+-------+-------------------------+-----+
| host | ts | val |
+-------+-------------------------+-----+
| host2 | 1970-01-01T00:00:00.001 | 0.0 |
| host1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------------------------+-----+
CREATE TABLE t2 (
ts timestamp time index,
job string primary key,
val double
) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
ALTER TABLE
t1
ADD
COLUMN `at` STRING;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN at3 STRING;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN `at` STRING;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN at2 STRING;
Affected Rows: 0
INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 'job1', 0, 1);
Affected Rows: 1
SELECT
*
FROM
t2;
+-------+-------+-------+------+---------------------+-----+
| at | at2 | at3 | job | ts | val |
+-------+-------+-------+------+---------------------+-----+
| loc_1 | loc_2 | loc_3 | job1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------+-------+------+---------------------+-----+
DROP TABLE t1;
Affected Rows: 0
DROP TABLE t2;
Affected Rows: 0
DROP TABLE phy;
Affected Rows: 0

View File

@@ -20,3 +20,65 @@ ALTER TABLE test_alt_table ADD COLUMN m INTEGER;
DESC TABLE test_alt_table;
DROP TABLE test_alt_table;
-- to test if same name column can be added
CREATE TABLE phy (ts timestamp time index, val double) engine = metric with ("physical_metric_table" = "");
CREATE TABLE t1 (
ts timestamp time index,
val double,
host string primary key
) engine = metric with ("on_physical_table" = "phy");
INSERT INTO
t1
VALUES
('host1', 0, 1),
('host2', 1, 0,);
SELECT
*
FROM
t1;
CREATE TABLE t2 (
ts timestamp time index,
job string primary key,
val double
) engine = metric with ("on_physical_table" = "phy");
ALTER TABLE
t1
ADD
COLUMN `at` STRING;
ALTER TABLE
t2
ADD
COLUMN at3 STRING;
ALTER TABLE
t2
ADD
COLUMN `at` STRING;
ALTER TABLE
t2
ADD
COLUMN at2 STRING;
INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 'job1', 0, 1);
SELECT
*
FROM
t2;
DROP TABLE t1;
DROP TABLE t2;
DROP TABLE phy;