fix(metric-engine): validate column types and require time index in verify_rows (#8018)

* fix(metric-engine): validate column types and require time index in verify_rows

The remote-write path into the metric engine previously bypassed schema
validation. When a row's time index column carried a non-timestamp
datatype (e.g. a string), the request reached mito's ValueBuilder::push
for the timestamp builder and panicked instead of surfacing a typed
error.

Cache the (column_id, data_type, semantic_type) tuple for each physical
column on PhysicalRegionState and use it in verify_rows to:

- reject columns whose datatype or semantic type disagrees with the
  physical region's schema (mirrors mito's WriteRequest::check_schema)
- reject requests that omit the time index column entirely

Field columns stay optional; tag completeness needs per-logical-region
metadata that verify_rows doesn't have and is left to a follow-up.

Fixes #7990.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* refactor(metric-engine): simplify PhysicalColumnInfo construction

- Add From<ColumnMetadata> and From<&ColumnMetadata> for PhysicalColumnInfo
  so call sites can use metadata.into() instead of repeating the field list.
- Replace the four struct-literal constructions in create.rs, open.rs and
  alter.rs with the conversion.
- In verify_rows, pass &col.column_name to ColumnNotFoundSnafu instead of
  cloning it explicitly (snafu's context handles the conversion).

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* perf(metric-engine): cache time index column name in PhysicalRegionState

verify_rows previously scanned every physical column on each row batch to
find the timestamp column. Since the time index is fixed at region
creation and never changes, stash its name on PhysicalRegionState when
the region is first registered and read it directly from there.

add_physical_columns carries a debug_assert to document the invariant
that alter never introduces a new time index.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* perf(metric-engine): borrow physical column names when building name_to_id

On the row-write path we built a HashMap<String, ColumnId> by cloning
every column name out of the physical region's cached state. The map is
scoped to the block that holds the state's read guard, so there's no
need to own the keys.

Switch the map to HashMap<&str, ColumnId> and widen RowsIter::new /
IterIndex::new to accept any key type that borrows as str. Existing
test helpers that pass HashMap<String, ColumnId> keep working through
the Borrow<str> bound.

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>

* fix: validate metric rows against physical schema

Cache physical column metadata in the metric engine state so row validation and row modification can use the same source of truth for column IDs, data types, and semantic types.

Validate incoming metric rows against the physical schema before writes. Put requests now require the time index and the expected field column, while delete requests keep accepting primary-key-plus-timestamp payloads by skipping the field completeness check.

Pass physical column metadata directly into RowsIter instead of rebuilding a name-to-column-id map at each call site, and cover the new validation paths with tests for missing time indexes, missing fields, and duplicate field columns.

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: do not allow adding a new field

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fill default value for fields

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fill default for nullable fields

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: BootstrapperSBL <yvanwww01@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: BootstrapperSBL <yvanwww01@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yvan Wang
2026-05-07 20:41:07 +08:00
committed by GitHub
parent d0e0c21600
commit d1873ca31d
16 changed files with 723 additions and 93 deletions

View File

@@ -201,6 +201,7 @@ mod tests {
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use super::*;
@@ -364,11 +365,23 @@ mod tests {
let modified =
modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
let name_to_column_id: HashMap<String, ColumnId> = [
("greptime_timestamp".to_string(), 0),
("greptime_value".to_string(), 1),
("namespace".to_string(), 2),
("host".to_string(), 3),
let make_info = |name: &str, column_id: ColumnId| ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
name.to_string(),
datatypes::prelude::ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id,
};
let name_to_column_id: HashMap<String, ColumnMetadata> = [
(
"greptime_timestamp".to_string(),
make_info("greptime_timestamp", 0),
),
("greptime_value".to_string(), make_info("greptime_value", 1)),
("namespace".to_string(), make_info("namespace", 2)),
("host".to_string(), make_info("host", 3)),
]
.into_iter()
.collect();

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::SemanticType;
use common_telemetry::{debug, info, warn};
use common_telemetry::{debug, info};
use datatypes::schema::{SkippingIndexOptions, SkippingIndexType};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
@@ -27,8 +27,8 @@ use store_api::storage::{ConcreteDataType, RegionId};
use crate::engine::IndexOptions;
use crate::error::{
ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
AddingFieldColumnSnafu, ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT};
use crate::utils;
@@ -132,10 +132,10 @@ impl DataRegion {
.fail();
}
} else {
warn!(
"Column {} in region {region_id} is not a tag",
c.column_schema.name
);
return AddingFieldColumnSnafu {
name: &c.column_schema.name,
}
.fail();
};
c.column_id = new_column_id_start + delta as u32;

View File

@@ -189,7 +189,7 @@ impl MetricEngineInner {
let new_add_columns = new_column_names.iter().map(|name| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map.get(name).unwrap();
(name.to_string(), column_metadata.column_id)
(name.to_string(), column_metadata.clone())
});
// Writes logical regions metadata to metadata region

View File

@@ -14,11 +14,13 @@
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AlterKind, RegionAlterRequest};
use store_api::storage::{ColumnId, RegionId};
use store_api::storage::RegionId;
use crate::error::Result;
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
///
@@ -27,7 +29,7 @@ use crate::error::Result;
/// This function will panic if the alter kind is not `AddColumns`.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionAlterRequest)],
physical_columns: &HashMap<String, ColumnId>,
physical_columns: &HashMap<String, ColumnMetadata>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
@@ -40,6 +42,12 @@ pub fn extract_new_columns<'a>(
if !physical_columns.contains_key(column_name)
&& !new_column_names.contains(column_name)
{
ensure!(
col.column_metadata.semantic_type != SemanticType::Field,
AddingFieldColumnSnafu {
name: column_name.to_string(),
}
);
new_column_names.insert(column_name);
// TODO(weny): avoid clone
new_columns.push(col.column_metadata.clone());
@@ -49,3 +57,55 @@ pub fn extract_new_columns<'a>(
Ok(())
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest};
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
#[test]
fn test_extract_new_columns_with_field_type() {
let requests = vec![(
RegionId::new(1, 1),
RegionAlterRequest {
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"new_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 0,
},
location: None,
}],
},
},
)];
let physical_columns = HashMap::new();
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();
let err = extract_new_columns(
&requests,
&physical_columns,
&mut new_column_names,
&mut new_columns,
)
.unwrap_err();
assert!(matches!(err, Error::AddingFieldColumn { .. }));
}
}

View File

@@ -190,13 +190,13 @@ impl MetricEngineInner {
for (index, field) in batch.schema().fields().iter().enumerate() {
let name = field.name();
let column_id =
*physical_columns
.get(name)
.with_context(|| error::ColumnNotFoundSnafu {
name: name.clone(),
region_id: logical_region_id,
})?;
let column_id = physical_columns
.get(name)
.map(|info| info.column_id)
.with_context(|| error::ColumnNotFoundSnafu {
name: name.clone(),
region_id: logical_region_id,
})?;
if tag_names.contains(name.as_str()) {
tag_columns.push(TagColumnInfo {
name: name.clone(),

View File

@@ -145,7 +145,7 @@ impl MetricEngineInner {
let physical_columns = create_data_region_request
.column_metadatas
.iter()
.map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id))
.map(|metadata| (metadata.column_schema.name.clone(), metadata.clone()))
.collect::<HashMap<_, _>>();
let time_index_unit = create_data_region_request
.column_metadatas
@@ -321,7 +321,7 @@ impl MetricEngineInner {
let new_add_columns = new_column_names.iter().map(|name| {
// Safety: previous steps ensure the physical region exist
let column_metadata = *physical_schema_map.get(name).unwrap();
(name.to_string(), column_metadata.column_id)
(name.to_string(), column_metadata.clone())
});
extension_return_value.insert(

View File

@@ -18,14 +18,14 @@ use api::v1::SemanticType;
use snafu::ensure;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::RegionCreateRequest;
use store_api::storage::{ColumnId, RegionId};
use store_api::storage::RegionId;
use crate::error::{AddingFieldColumnSnafu, Result};
/// Extract new columns from the create requests.
pub fn extract_new_columns<'a>(
requests: &'a [(RegionId, RegionCreateRequest)],
physical_columns: &HashMap<String, ColumnId>,
physical_columns: &HashMap<String, ColumnMetadata>,
new_column_names: &mut HashSet<&'a str>,
new_columns: &mut Vec<ColumnMetadata>,
) -> Result<()> {
@@ -123,7 +123,18 @@ mod tests {
];
let mut physical_columns = HashMap::new();
physical_columns.insert("existing_column".to_string(), 0);
physical_columns.insert(
"existing_column".to_string(),
ColumnMetadata {
column_schema: ColumnSchema::new(
"existing_column".to_string(),
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 0,
},
);
let mut new_column_names = HashSet::new();
let mut new_columns = Vec::new();

View File

@@ -326,7 +326,7 @@ impl MetricEngineInner {
.unwrap();
let physical_columns = physical_columns
.into_iter()
.map(|col| (col.column_schema.name, col.column_id))
.map(|col| (col.column_schema.name.clone(), col))
.collect();
state.add_physical_region(
physical_region_id,

View File

@@ -14,13 +14,16 @@
use std::collections::HashMap;
use api::helper::ColumnDataTypeWrapper;
use api::v1::{
ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint,
ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, SemanticType, Value,
WriteHint,
};
use common_telemetry::{error, info};
use fxhash::FxHashMap;
use snafu::{OptionExt, ensure};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{
AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest,
};
@@ -28,8 +31,9 @@ use store_api::storage::{RegionId, TableId};
use crate::engine::MetricEngineInner;
use crate::error::{
ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu,
ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu,
LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnexpectedRequestSnafu,
UnsupportedRegionRequestSnafu,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
use crate::row_modifier::{RowsIter, TableIdInput};
@@ -116,7 +120,7 @@ impl MetricEngineInner {
async fn put_regions_batch_single_physical(
&self,
physical_region_id: RegionId,
requests: Vec<(RegionId, RegionPutRequest)>,
mut requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<AffectedRows> {
if requests.is_empty() {
return Ok(0);
@@ -126,7 +130,7 @@ impl MetricEngineInner {
let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?;
// Validate all requests
self.validate_batch_requests(physical_region_id, &requests)
self.validate_batch_requests(physical_region_id, &mut requests)
.await?;
// Merge requests according to encoding strategy
@@ -157,11 +161,16 @@ impl MetricEngineInner {
async fn validate_batch_requests(
&self,
physical_region_id: RegionId,
requests: &[(RegionId, RegionPutRequest)],
requests: &mut [(RegionId, RegionPutRequest)],
) -> Result<()> {
for (logical_region_id, request) in requests {
self.verify_rows(*logical_region_id, physical_region_id, &request.rows)
.await?;
self.verify_rows(
*logical_region_id,
physical_region_id,
&mut request.rows,
true,
)
.await?;
}
Ok(())
}
@@ -248,7 +257,7 @@ impl MetricEngineInner {
// Batch-modify all rows (add __table_id and __tsid columns)
let final_rows = {
let state = self.state.read().unwrap();
let name_to_id = state
let physical_columns = state
.physical_region_states()
.get(&data_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
@@ -261,7 +270,7 @@ impl MetricEngineInner {
schema: merged_schema,
rows: merged_rows,
},
name_to_id,
physical_columns,
);
self.row_modifier.modify_rows(
@@ -406,8 +415,13 @@ impl MetricEngineInner {
let (physical_region_id, data_region_id, primary_key_encoding) =
self.find_data_region_meta(logical_region_id)?;
self.verify_rows(logical_region_id, physical_region_id, &request.rows)
.await?;
self.verify_rows(
logical_region_id,
physical_region_id,
&mut request.rows,
true,
)
.await?;
// write to data region
// TODO: retrieve table name
@@ -439,8 +453,13 @@ impl MetricEngineInner {
let (physical_region_id, data_region_id, primary_key_encoding) =
self.find_data_region_meta(logical_region_id)?;
self.verify_rows(logical_region_id, physical_region_id, &request.rows)
.await?;
self.verify_rows(
logical_region_id,
physical_region_id,
&mut request.rows,
false,
)
.await?;
// write to data region
// TODO: retrieve table name
@@ -484,12 +503,18 @@ impl MetricEngineInner {
///
/// Includes:
/// - Check if the logical region exists
/// - Check if the columns exist
/// - Check if every column in the request exists in the physical region
/// - Check each column's datatype and semantic type match the physical region's schema
/// - Check the time index column is present
/// - When `check_fields` is true, check every physical field column is present.
/// Set this to `false` for delete requests, which legitimately carry only
/// the primary key + timestamp.
async fn verify_rows(
&self,
logical_region_id: RegionId,
physical_region_id: RegionId,
rows: &Rows,
rows: &mut Rows,
check_fields: bool,
) -> Result<()> {
// Check if the region exists
let data_region_id = to_data_region_id(physical_region_id);
@@ -502,22 +527,137 @@ impl MetricEngineInner {
.fail();
}
// Check if a physical column exists
let physical_columns = state
// Type + semantic check on every column in the request schema.
let physical_state = state
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.physical_columns();
})?;
let physical_columns = physical_state.physical_columns();
for col in &rows.schema {
ensure!(
physical_columns.contains_key(&col.column_name),
ColumnNotFoundSnafu {
name: col.column_name.clone(),
let info = physical_columns
.get(&col.column_name)
.context(ColumnNotFoundSnafu {
name: &col.column_name,
region_id: logical_region_id,
})?;
ensure!(
api::helper::is_column_type_value_eq(
col.datatype,
col.datatype_extension.clone(),
&info.column_schema.data_type
),
InvalidRequestSnafu {
region_id: logical_region_id,
reason: format!(
"column {} expect type {:?}, given: {}({})",
col.column_name,
info.column_schema.data_type,
api::v1::ColumnDataType::try_from(col.datatype)
.map(|v| v.as_str_name())
.unwrap_or("Unknown"),
col.datatype,
),
}
);
ensure!(
api::helper::is_semantic_type_eq(col.semantic_type, info.semantic_type),
InvalidRequestSnafu {
region_id: logical_region_id,
reason: format!(
"column {} expect semantic type {:?}, given: {}({})",
col.column_name,
info.semantic_type,
api::v1::SemanticType::try_from(col.semantic_type)
.map(|v| v.as_str_name())
.unwrap_or("Unknown"),
col.semantic_type,
),
}
);
}
let ts_name = physical_state.time_index_column_name();
ensure!(
rows.schema.iter().any(|col| col.column_name == ts_name),
InvalidRequestSnafu {
region_id: logical_region_id,
reason: format!("missing required time index column {ts_name}"),
}
);
if check_fields {
let field_name = physical_state.field_column_name();
if !rows.schema.iter().any(|col| col.column_name == field_name) {
let field_meta =
physical_columns
.get(field_name)
.with_context(|| ColumnNotFoundSnafu {
name: field_name,
region_id: logical_region_id,
})?;
Self::fill_missing_field_column(logical_region_id, field_name, field_meta, rows)?;
}
}
Ok(())
}
fn fill_missing_field_column(
logical_region_id: RegionId,
field_name: &str,
field_meta: &ColumnMetadata,
rows: &mut Rows,
) -> Result<()> {
ensure!(
!field_meta.column_schema.is_default_impure(),
UnexpectedRequestSnafu {
reason: format!(
"unexpected impure default value with region_id: {logical_region_id}, column: {field_name}, default_value: {:?}",
field_meta.column_schema.default_constraint(),
),
}
);
let default_value = field_meta
.column_schema
.create_default()
.context(CreateDefaultSnafu {
region_id: logical_region_id,
column: field_name,
})?
.with_context(|| InvalidRequestSnafu {
region_id: logical_region_id,
reason: format!("missing required field column {field_name}"),
})?;
let default_value = api::helper::to_grpc_value(default_value);
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(field_meta.column_schema.data_type.clone())
.map_err(|e| {
InvalidRequestSnafu {
region_id: logical_region_id,
reason: format!(
"no protobuf type for field column {field_name} ({:?}): {e}",
field_meta.column_schema.data_type
),
}
.build()
})?
.to_parts();
rows.schema.push(ColumnSchema {
column_name: field_name.to_string(),
datatype: datatype as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension,
options: None,
});
for row in &mut rows.rows {
row.values.push(default_value.clone());
}
Ok(())
@@ -536,14 +676,14 @@ impl MetricEngineInner {
let input = std::mem::take(rows);
let iter = {
let state = self.state.read().unwrap();
let name_to_id = state
let physical_columns = state
.physical_region_states()
.get(&physical_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: physical_region_id,
})?
.physical_columns();
RowsIter::new(input, name_to_id)
RowsIter::new(input, physical_columns)
};
let output =
self.row_modifier
@@ -557,12 +697,17 @@ impl MetricEngineInner {
mod tests {
use std::collections::HashSet;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::utils::partition_expr_version;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::value::Value as PartitionValue;
use partition::expr::col;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
@@ -1258,4 +1403,271 @@ mod tests {
.unwrap();
assert_eq!(response.affected_rows, 3);
}
/// Regression test for issue #7990: the metric engine must reject a row
/// whose timestamp column carries a non-timestamp datatype, rather than
/// letting it panic inside mito's `ValueBuilder::push`.
#[tokio::test]
async fn test_verify_rows_rejects_wrong_type() {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType};
use common_query::prelude::{greptime_timestamp, greptime_value};
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
// Timestamp column is declared as String — the very payload that
// caused #7990. It should surface a typed error rather than panic.
let schema = vec![
PbColumnSchema {
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: "job".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
];
let rows = vec![Row {
values: vec![
Value {
value_data: Some(ValueData::StringValue("not-a-timestamp".to_string())),
},
Value {
value_data: Some(ValueData::F64Value(1.0)),
},
Value {
value_data: Some(ValueData::StringValue("tag_0".to_string())),
},
],
}];
let err = env
.metric()
.handle_request(
logical_region_id,
RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_expr_version: None,
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
/// The completeness check must reject requests that omit the time index
/// column, since mito cannot default-fill a `TimeIndex` column and would
/// previously panic on the empty builder.
#[tokio::test]
async fn test_verify_rows_rejects_missing_time_index() {
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType};
use common_query::prelude::greptime_value;
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
// Payload only carries the field and a tag — no timestamp column.
let schema = vec![
PbColumnSchema {
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: "job".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
];
let rows = vec![Row {
values: vec![
Value {
value_data: Some(api::v1::value::ValueData::F64Value(1.0)),
},
Value {
value_data: Some(api::v1::value::ValueData::StringValue("tag_0".to_string())),
},
],
}];
let err = env
.metric()
.handle_request(
logical_region_id,
RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_expr_version: None,
}),
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[tokio::test]
async fn test_verify_rows_rejects_missing_field() {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType};
use common_query::prelude::greptime_timestamp;
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
// Schema has timestamp + tag but no field column.
let schema = vec![
PbColumnSchema {
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
},
PbColumnSchema {
column_name: "job".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as _,
datatype_extension: None,
options: None,
},
];
let rows = vec![Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue("tag_0".to_string())),
},
],
}];
let err = env
.metric()
.handle_request(
logical_region_id,
RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
hint: None,
partition_expr_version: None,
}),
)
.await
.unwrap_err();
let message = err.to_string();
assert!(
message.contains("missing required field column"),
"expected field-completeness rejection, got: {message}"
);
assert_eq!(err.status_code(), StatusCode::InvalidArguments);
}
#[test]
fn test_fill_missing_field_column_nullable_no_default() {
let field_meta = ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value".to_string(),
ConcreteDataType::float64_datatype(),
true, // nullable, no default
),
};
let mut rows = Rows {
schema: vec![PbColumnSchema {
column_name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
}],
rows: vec![Row {
values: vec![Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
}],
}],
};
MetricEngineInner::fill_missing_field_column(
RegionId::new(1, 1),
"greptime_value",
&field_meta,
&mut rows,
)
.unwrap();
assert_eq!(rows.schema.len(), 2);
assert_eq!(rows.schema[1].column_name, "greptime_value");
assert_eq!(rows.rows[0].values.len(), 2);
assert!(
rows.rows[0].values[1].value_data.is_none(),
"missing nullable field should be filled with null"
);
}
#[test]
fn test_fill_missing_field_column_rejects_impure_default() {
let field_meta = ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(ColumnDefaultConstraint::Function("now()".to_string())))
.unwrap(),
};
let mut rows = Rows {
schema: vec![PbColumnSchema {
column_name: "ts".to_string(),
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as _,
datatype_extension: None,
options: None,
}],
rows: vec![Row {
values: vec![Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
}],
}],
};
let err = MetricEngineInner::fill_missing_field_column(
RegionId::new(1, 1),
"greptime_value",
&field_meta,
&mut rows,
)
.unwrap_err();
assert!(
err.to_string().contains("impure default value"),
"expected impure-default rejection, got: {err}"
);
}
}

View File

@@ -16,11 +16,13 @@
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use common_telemetry::warn;
use common_time::timestamp::TimeUnit;
use snafu::OptionExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::storage::{ColumnId, RegionId};
use store_api::storage::RegionId;
use crate::engine::options::PhysicalRegionOptions;
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
@@ -29,7 +31,16 @@ use crate::utils::to_data_region_id;
pub struct PhysicalRegionState {
logical_regions: HashSet<RegionId>,
physical_columns: HashMap<String, ColumnId>,
physical_columns: HashMap<String, ColumnMetadata>,
/// Name of the time index column, cached at region load so that the write
/// path doesn't have to scan `physical_columns` for the timestamp on every
/// row batch. The time index is fixed at region creation and never
/// changes, so this stays in sync with `physical_columns`.
time_index_column_name: String,
/// Name of the field column. Metric regions have exactly one field column
/// verified at creation time, so the write path can validate completeness
/// without consulting per-logical-region metadata.
field_column_name: String,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
@@ -37,14 +48,29 @@ pub struct PhysicalRegionState {
impl PhysicalRegionState {
pub fn new(
physical_columns: HashMap<String, ColumnId>,
physical_columns: HashMap<String, ColumnMetadata>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
) -> Self {
// Safety: a valid physical region always has exactly one time index
// column; callers validate this before reaching here (see
// `create_data_region_request` and the open path).
let time_index_column_name = physical_columns
.iter()
.find(|(_, meta)| meta.semantic_type == SemanticType::Timestamp)
.map(|(name, _)| name.clone())
.unwrap_or_default();
let field_column_name = physical_columns
.iter()
.find(|(_, meta)| meta.semantic_type == SemanticType::Field)
.map(|(name, _)| name.clone())
.unwrap_or_default();
Self {
logical_regions: HashSet::new(),
physical_columns,
time_index_column_name,
field_column_name,
primary_key_encoding,
options,
time_index_unit,
@@ -57,10 +83,20 @@ impl PhysicalRegionState {
}
/// Returns a reference to the physical columns.
pub fn physical_columns(&self) -> &HashMap<String, ColumnId> {
pub fn physical_columns(&self) -> &HashMap<String, ColumnMetadata> {
&self.physical_columns
}
/// Returns the cached name of the time index column.
pub fn time_index_column_name(&self) -> &str {
&self.time_index_column_name
}
/// Returns the cached name of the field column.
pub fn field_column_name(&self) -> &str {
&self.field_column_name
}
/// Returns a reference to the physical region options.
pub fn options(&self) -> &PhysicalRegionOptions {
&self.options
@@ -90,7 +126,7 @@ impl MetricEngineState {
pub fn add_physical_region(
&mut self,
physical_region_id: RegionId,
physical_columns: HashMap<String, ColumnId>,
physical_columns: HashMap<String, ColumnMetadata>,
primary_key_encoding: PrimaryKeyEncoding,
options: PhysicalRegionOptions,
time_index_unit: TimeUnit,
@@ -112,12 +148,25 @@ impl MetricEngineState {
pub fn add_physical_columns(
&mut self,
physical_region_id: RegionId,
physical_columns: impl IntoIterator<Item = (String, ColumnId)>,
physical_columns: impl IntoIterator<Item = (String, ColumnMetadata)>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
let state = self.physical_regions.get_mut(&physical_region_id).unwrap();
for (col, id) in physical_columns {
state.physical_columns.insert(col, id);
for (col, meta) in physical_columns {
// The time index is fixed at region creation and alter cannot add
// a new one; keep the cached name in sync defensively.
debug_assert_ne!(
meta.semantic_type,
SemanticType::Timestamp,
"unexpected time index column {col} added to an existing physical region"
);
if meta.semantic_type == SemanticType::Field {
warn!(
"Unexpected field column {col} added to physical region {physical_region_id}; cached field column remains {}",
state.field_column_name
);
}
state.physical_columns.insert(col, meta);
}
}

View File

@@ -343,6 +343,19 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Failed to create default value for column {} of region {}",
column,
region_id
))]
CreateDefault {
region_id: RegionId,
column: String,
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected request: {}", reason))]
UnexpectedRequest {
reason: String,
@@ -394,7 +407,8 @@ impl ErrorExt for Error {
| UnsupportedAlterKind { .. }
| UnsupportedRemapManifestsRequest { .. }
| UnsupportedSyncRegionFromRequest { .. }
| InvalidRequest { .. } => StatusCode::InvalidArguments,
| InvalidRequest { .. }
| CreateDefault { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. }
| UnsupportedRegionRequest { .. }

View File

@@ -23,6 +23,7 @@ use mito_codec::row_converter::SparsePrimaryKeyCodec;
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
@@ -264,7 +265,10 @@ struct IterIndex {
}
impl IterIndex {
fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap<String, ColumnId>) -> Self {
fn new(
row_schema: &[ColumnSchema],
physical_columns: &HashMap<String, ColumnMetadata>,
) -> Self {
let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new();
// Uses BTreeMap to keep the primary key column name order (lexicographical)
let mut primary_key_indices = BTreeMap::new();
@@ -290,7 +294,10 @@ impl IterIndex {
primary_key_indices.insert(
col.column_name.as_str(),
ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
column_id: physical_columns
.get(&col.column_name)
.unwrap()
.column_id,
index: idx,
},
);
@@ -298,13 +305,13 @@ impl IterIndex {
},
SemanticType::Field => {
field_indices.push(ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
column_id: physical_columns.get(&col.column_name).unwrap().column_id,
index: idx,
});
}
SemanticType::Timestamp => {
ts_index = Some(ValueIndex {
column_id: *name_to_column_id.get(&col.column_name).unwrap(),
column_id: physical_columns.get(&col.column_name).unwrap().column_id,
index: idx,
});
}
@@ -338,8 +345,8 @@ pub struct RowsIter {
}
impl RowsIter {
pub fn new(rows: Rows, name_to_column_id: &HashMap<String, ColumnId>) -> Self {
let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id);
pub fn new(rows: Rows, physical_columns: &HashMap<String, ColumnMetadata>) -> Self {
let index: IterIndex = IterIndex::new(&rows.schema, physical_columns);
Self { rows, index }
}
@@ -455,8 +462,23 @@ mod tests {
}
}
fn test_name_to_column_id() -> HashMap<String, ColumnId> {
HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)])
fn make_info(name: &str, column_id: ColumnId) -> ColumnMetadata {
ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema::new(
name.to_string(),
datatypes::prelude::ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id,
}
}
fn test_name_to_column_id() -> HashMap<String, ColumnMetadata> {
HashMap::from([
("namespace".to_string(), make_info("namespace", 1)),
("host".to_string(), make_info("host", 2)),
])
}
#[test]
@@ -657,11 +679,11 @@ mod tests {
}
/// Helper function to create a name_to_column_id map
fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnId> {
fn create_name_to_column_id(labels: &[&str]) -> HashMap<String, ColumnMetadata> {
labels
.iter()
.enumerate()
.map(|(idx, name)| (name.to_string(), idx as ColumnId + 1))
.map(|(idx, name)| (name.to_string(), make_info(name, idx as ColumnId + 1)))
.collect()
}
@@ -692,7 +714,7 @@ mod tests {
fn extract_tsid(
schema: Vec<ColumnSchema>,
row: Row,
name_to_column_id: &HashMap<String, ColumnId>,
name_to_column_id: &HashMap<String, ColumnMetadata>,
table_id: TableId,
) -> u64 {
let rows = Rows {

View File

@@ -508,42 +508,42 @@ Affected Rows: 0
ALTER TABLE
t1
ADD
COLUMN `at` STRING;
COLUMN `at` STRING PRIMARY KEY;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN at3 STRING;
COLUMN at3 STRING PRIMARY KEY;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN `at` STRING;
COLUMN `at` STRING PRIMARY KEY;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN at2 STRING;
COLUMN at2 STRING PRIMARY KEY;
Affected Rows: 0
ALTER TABLE
t2
ADD
COLUMN at4 UINT16;
COLUMN at4 STRING PRIMARY KEY;
Affected Rows: 0
INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1);
("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1);
Affected Rows: 1
@@ -552,11 +552,11 @@ SELECT
FROM
t2;
+-------+-------+-------+-----+------+---------------------+-----+
| at | at2 | at3 | at4 | job | ts | val |
+-------+-------+-------+-----+------+---------------------+-----+
| loc_1 | loc_2 | loc_3 | 2 | job1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------+-------+-----+------+---------------------+-----+
+-------+-------+-------+-------+------+---------------------+-----+
| at | at2 | at3 | at4 | job | ts | val |
+-------+-------+-------+-------+------+---------------------+-----+
| loc_1 | loc_2 | loc_3 | loc_4 | job1 | 1970-01-01T00:00:00 | 1.0 |
+-------+-------+-------+-------+------+---------------------+-----+
DROP TABLE t1;

View File

@@ -184,32 +184,32 @@ CREATE TABLE t2 (
ALTER TABLE
t1
ADD
COLUMN `at` STRING;
COLUMN `at` STRING PRIMARY KEY;
ALTER TABLE
t2
ADD
COLUMN at3 STRING;
COLUMN at3 STRING PRIMARY KEY;
ALTER TABLE
t2
ADD
COLUMN `at` STRING;
COLUMN `at` STRING PRIMARY KEY;
ALTER TABLE
t2
ADD
COLUMN at2 STRING;
COLUMN at2 STRING PRIMARY KEY;
ALTER TABLE
t2
ADD
COLUMN at4 UINT16;
COLUMN at4 STRING PRIMARY KEY;
INSERT INTO
t2
VALUES
("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1);
("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1);
SELECT
*

View File

@@ -77,6 +77,40 @@ DROP TABLE phy;
Affected Rows: 0
CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = "");
Affected Rows: 0
CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default");
Affected Rows: 0
INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1);
Affected Rows: 2
SELECT host, ts, val FROM t_default ORDER BY host;
+-------+-------------------------+------+
| host | ts | val |
+-------+-------------------------+------+
| host1 | 1970-01-01T00:00:00 | 42.0 |
| host2 | 1970-01-01T00:00:00.001 | 42.0 |
+-------+-------------------------+------+
-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region
INSERT INTO t_default (host, val) VALUES ('host3', 3);
Error: 1004(InvalidArguments), Invalid request for region, reason: missing required time index column ts
DROP TABLE t_default;
Affected Rows: 0
DROP TABLE phy_default;
Affected Rows: 0
CREATE TABLE phy (
ts timestamp time index,
val double

View File

@@ -24,6 +24,21 @@ SELECT ts, val, __tsid, host, job FROM phy;
DROP TABLE phy;
CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = "");
CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default");
INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1);
SELECT host, ts, val FROM t_default ORDER BY host;
-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region
INSERT INTO t_default (host, val) VALUES ('host3', 3);
DROP TABLE t_default;
DROP TABLE phy_default;
CREATE TABLE phy (
ts timestamp time index,
val double