From d1873ca31dc7ee4d4220d44ca6d756106a848cd2 Mon Sep 17 00:00:00 2001 From: Yvan Wang <131545713+BootstrapperSBL@users.noreply.github.com> Date: Thu, 7 May 2026 20:41:07 +0800 Subject: [PATCH] fix(metric-engine): validate column types and require time index in verify_rows (#8018) * fix(metric-engine): validate column types and require time index in verify_rows The remote-write path into the metric engine previously bypassed schema validation. When a row's time index column carried a non-timestamp datatype (e.g. a string), the request reached mito's ValueBuilder::push for the timestamp builder and panicked instead of surfacing a typed error. Cache the (column_id, data_type, semantic_type) tuple for each physical column on PhysicalRegionState and use it in verify_rows to: - reject columns whose datatype or semantic type disagrees with the physical region's schema (mirrors mito's WriteRequest::check_schema) - reject requests that omit the time index column entirely Field columns stay optional; tag completeness needs per-logical-region metadata that verify_rows doesn't have and is left to a follow-up. Fixes #7990. Signed-off-by: BootstrapperSBL * refactor(metric-engine): simplify PhysicalColumnInfo construction - Add From and From<&ColumnMetadata> for PhysicalColumnInfo so call sites can use metadata.into() instead of repeating the field list. - Replace the four struct-literal constructions in create.rs, open.rs and alter.rs with the conversion. - In verify_rows, pass &col.column_name to ColumnNotFoundSnafu instead of cloning it explicitly (snafu's context handles the conversion). Signed-off-by: BootstrapperSBL * perf(metric-engine): cache time index column name in PhysicalRegionState verify_rows previously scanned every physical column on each row batch to find the timestamp column. Since the time index is fixed at region creation and never changes, stash its name on PhysicalRegionState when the region is first registered and read it directly from there. add_physical_columns carries a debug_assert to document the invariant that alter never introduces a new time index. Signed-off-by: BootstrapperSBL * perf(metric-engine): borrow physical column names when building name_to_id On the row-write path we built a HashMap by cloning every column name out of the physical region's cached state. The map is scoped to the block that holds the state's read guard, so there's no need to own the keys. Switch the map to HashMap<&str, ColumnId> and widen RowsIter::new / IterIndex::new to accept any key type that borrows as str. Existing test helpers that pass HashMap keep working through the Borrow bound. Signed-off-by: BootstrapperSBL * fix: validate metric rows against physical schema Cache physical column metadata in the metric engine state so row validation and row modification can use the same source of truth for column IDs, data types, and semantic types. Validate incoming metric rows against the physical schema before writes. Put requests now require the time index and the expected field column, while delete requests keep accepting primary-key-plus-timestamp payloads by skipping the field completeness check. Pass physical column metadata directly into RowsIter instead of rebuilding a name-to-column-id map at each call site, and cover the new validation paths with tests for missing time indexes, missing fields, and duplicate field columns. Signed-off-by: evenyag * fix: do not allow adding a new field Signed-off-by: evenyag * fix: fill default value for fields Signed-off-by: evenyag * fix: fill default for nullable fields Signed-off-by: evenyag --------- Signed-off-by: BootstrapperSBL Signed-off-by: evenyag Co-authored-by: BootstrapperSBL Co-authored-by: evenyag --- src/metric-engine/src/batch_modifier.rs | 23 +- src/metric-engine/src/data_region.rs | 14 +- src/metric-engine/src/engine/alter.rs | 2 +- .../src/engine/alter/extract_new_columns.rs | 66 ++- src/metric-engine/src/engine/bulk_insert.rs | 14 +- src/metric-engine/src/engine/create.rs | 4 +- .../src/engine/create/extract_new_columns.rs | 17 +- src/metric-engine/src/engine/open.rs | 2 +- src/metric-engine/src/engine/put.rs | 466 +++++++++++++++++- src/metric-engine/src/engine/state.rs | 65 ++- src/metric-engine/src/error.rs | 16 +- src/metric-engine/src/row_modifier.rs | 44 +- .../common/alter/alter_table.result | 22 +- .../standalone/common/alter/alter_table.sql | 12 +- .../common/insert/logical_metric_table.result | 34 ++ .../common/insert/logical_metric_table.sql | 15 + 16 files changed, 723 insertions(+), 93 deletions(-) diff --git a/src/metric-engine/src/batch_modifier.rs b/src/metric-engine/src/batch_modifier.rs index 76d9bb418a..2162215d1b 100644 --- a/src/metric-engine/src/batch_modifier.rs +++ b/src/metric-engine/src/batch_modifier.rs @@ -201,6 +201,7 @@ mod tests { use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datatypes::arrow::record_batch::RecordBatch; use store_api::codec::PrimaryKeyEncoding; + use store_api::metadata::ColumnMetadata; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use super::*; @@ -364,11 +365,23 @@ mod tests { let modified = modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); - let name_to_column_id: HashMap = [ - ("greptime_timestamp".to_string(), 0), - ("greptime_value".to_string(), 1), - ("namespace".to_string(), 2), - ("host".to_string(), 3), + let make_info = |name: &str, column_id: ColumnId| ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + name.to_string(), + datatypes::prelude::ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id, + }; + let name_to_column_id: HashMap = [ + ( + "greptime_timestamp".to_string(), + make_info("greptime_timestamp", 0), + ), + ("greptime_value".to_string(), make_info("greptime_value", 1)), + ("namespace".to_string(), make_info("namespace", 2)), + ("host".to_string(), make_info("host", 3)), ] .into_iter() .collect(); diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 9bc22e1102..b64d61a287 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::SemanticType; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, info}; use datatypes::schema::{SkippingIndexOptions, SkippingIndexType}; use mito2::engine::MitoEngine; use snafu::ResultExt; @@ -27,8 +27,8 @@ use store_api::storage::{ConcreteDataType, RegionId}; use crate::engine::IndexOptions; use crate::error::{ - ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu, - MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu, + AddingFieldColumnSnafu, ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, + MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT}; use crate::utils; @@ -132,10 +132,10 @@ impl DataRegion { .fail(); } } else { - warn!( - "Column {} in region {region_id} is not a tag", - c.column_schema.name - ); + return AddingFieldColumnSnafu { + name: &c.column_schema.name, + } + .fail(); }; c.column_id = new_column_id_start + delta as u32; diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 4b6b67f31b..892e0c91c2 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -189,7 +189,7 @@ impl MetricEngineInner { let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist let column_metadata = *physical_schema_map.get(name).unwrap(); - (name.to_string(), column_metadata.column_id) + (name.to_string(), column_metadata.clone()) }); // Writes logical regions metadata to metadata region diff --git a/src/metric-engine/src/engine/alter/extract_new_columns.rs b/src/metric-engine/src/engine/alter/extract_new_columns.rs index fdb1ef6126..c3bef1ba4f 100644 --- a/src/metric-engine/src/engine/alter/extract_new_columns.rs +++ b/src/metric-engine/src/engine/alter/extract_new_columns.rs @@ -14,11 +14,13 @@ use std::collections::{HashMap, HashSet}; +use api::v1::SemanticType; +use snafu::ensure; use store_api::metadata::ColumnMetadata; use store_api::region_request::{AlterKind, RegionAlterRequest}; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{AddingFieldColumnSnafu, Result}; /// Extract new columns from the create requests. /// @@ -27,7 +29,7 @@ use crate::error::Result; /// This function will panic if the alter kind is not `AddColumns`. pub fn extract_new_columns<'a>( requests: &'a [(RegionId, RegionAlterRequest)], - physical_columns: &HashMap, + physical_columns: &HashMap, new_column_names: &mut HashSet<&'a str>, new_columns: &mut Vec, ) -> Result<()> { @@ -40,6 +42,12 @@ pub fn extract_new_columns<'a>( if !physical_columns.contains_key(column_name) && !new_column_names.contains(column_name) { + ensure!( + col.column_metadata.semantic_type != SemanticType::Field, + AddingFieldColumnSnafu { + name: column_name.to_string(), + } + ); new_column_names.insert(column_name); // TODO(weny): avoid clone new_columns.push(col.column_metadata.clone()); @@ -49,3 +57,55 @@ pub fn extract_new_columns<'a>( Ok(()) } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use api::v1::SemanticType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; + use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + + #[test] + fn test_extract_new_columns_with_field_type() { + let requests = vec![( + RegionId::new(1, 1), + RegionAlterRequest { + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "new_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 0, + }, + location: None, + }], + }, + }, + )]; + + let physical_columns = HashMap::new(); + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); + + let err = extract_new_columns( + &requests, + &physical_columns, + &mut new_column_names, + &mut new_columns, + ) + .unwrap_err(); + + assert!(matches!(err, Error::AddingFieldColumn { .. })); + } +} diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs index 942dae1136..24c9e7934c 100644 --- a/src/metric-engine/src/engine/bulk_insert.rs +++ b/src/metric-engine/src/engine/bulk_insert.rs @@ -190,13 +190,13 @@ impl MetricEngineInner { for (index, field) in batch.schema().fields().iter().enumerate() { let name = field.name(); - let column_id = - *physical_columns - .get(name) - .with_context(|| error::ColumnNotFoundSnafu { - name: name.clone(), - region_id: logical_region_id, - })?; + let column_id = physical_columns + .get(name) + .map(|info| info.column_id) + .with_context(|| error::ColumnNotFoundSnafu { + name: name.clone(), + region_id: logical_region_id, + })?; if tag_names.contains(name.as_str()) { tag_columns.push(TagColumnInfo { name: name.clone(), diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 131ec1dbd4..34aeb5577c 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -145,7 +145,7 @@ impl MetricEngineInner { let physical_columns = create_data_region_request .column_metadatas .iter() - .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id)) + .map(|metadata| (metadata.column_schema.name.clone(), metadata.clone())) .collect::>(); let time_index_unit = create_data_region_request .column_metadatas @@ -321,7 +321,7 @@ impl MetricEngineInner { let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist let column_metadata = *physical_schema_map.get(name).unwrap(); - (name.to_string(), column_metadata.column_id) + (name.to_string(), column_metadata.clone()) }); extension_return_value.insert( diff --git a/src/metric-engine/src/engine/create/extract_new_columns.rs b/src/metric-engine/src/engine/create/extract_new_columns.rs index 9d1de9ebb2..82cc53ce3d 100644 --- a/src/metric-engine/src/engine/create/extract_new_columns.rs +++ b/src/metric-engine/src/engine/create/extract_new_columns.rs @@ -18,14 +18,14 @@ use api::v1::SemanticType; use snafu::ensure; use store_api::metadata::ColumnMetadata; use store_api::region_request::RegionCreateRequest; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; use crate::error::{AddingFieldColumnSnafu, Result}; /// Extract new columns from the create requests. pub fn extract_new_columns<'a>( requests: &'a [(RegionId, RegionCreateRequest)], - physical_columns: &HashMap, + physical_columns: &HashMap, new_column_names: &mut HashSet<&'a str>, new_columns: &mut Vec, ) -> Result<()> { @@ -123,7 +123,18 @@ mod tests { ]; let mut physical_columns = HashMap::new(); - physical_columns.insert("existing_column".to_string(), 0); + physical_columns.insert( + "existing_column".to_string(), + ColumnMetadata { + column_schema: ColumnSchema::new( + "existing_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }, + ); let mut new_column_names = HashSet::new(); let mut new_columns = Vec::new(); diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 70e3280183..59b1cfd928 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -326,7 +326,7 @@ impl MetricEngineInner { .unwrap(); let physical_columns = physical_columns .into_iter() - .map(|col| (col.column_schema.name, col.column_id)) + .map(|col| (col.column_schema.name.clone(), col)) .collect(); state.add_physical_region( physical_region_id, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index edae0d2bb4..07adfae120 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -14,13 +14,16 @@ use std::collections::HashMap; +use api::helper::ColumnDataTypeWrapper; use api::v1::{ - ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint, + ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, SemanticType, Value, + WriteHint, }; use common_telemetry::{error, info}; use fxhash::FxHashMap; -use snafu::{OptionExt, ensure}; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::ColumnMetadata; use store_api::region_request::{ AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest, }; @@ -28,8 +31,9 @@ use store_api::storage::{RegionId, TableId}; use crate::engine::MetricEngineInner; use crate::error::{ - ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu, - LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu, + ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu, + LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnexpectedRequestSnafu, + UnsupportedRegionRequestSnafu, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED}; use crate::row_modifier::{RowsIter, TableIdInput}; @@ -116,7 +120,7 @@ impl MetricEngineInner { async fn put_regions_batch_single_physical( &self, physical_region_id: RegionId, - requests: Vec<(RegionId, RegionPutRequest)>, + mut requests: Vec<(RegionId, RegionPutRequest)>, ) -> Result { if requests.is_empty() { return Ok(0); @@ -126,7 +130,7 @@ impl MetricEngineInner { let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?; // Validate all requests - self.validate_batch_requests(physical_region_id, &requests) + self.validate_batch_requests(physical_region_id, &mut requests) .await?; // Merge requests according to encoding strategy @@ -157,11 +161,16 @@ impl MetricEngineInner { async fn validate_batch_requests( &self, physical_region_id: RegionId, - requests: &[(RegionId, RegionPutRequest)], + requests: &mut [(RegionId, RegionPutRequest)], ) -> Result<()> { for (logical_region_id, request) in requests { - self.verify_rows(*logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + *logical_region_id, + physical_region_id, + &mut request.rows, + true, + ) + .await?; } Ok(()) } @@ -248,7 +257,7 @@ impl MetricEngineInner { // Batch-modify all rows (add __table_id and __tsid columns) let final_rows = { let state = self.state.read().unwrap(); - let name_to_id = state + let physical_columns = state .physical_region_states() .get(&data_region_id) .with_context(|| PhysicalRegionNotFoundSnafu { @@ -261,7 +270,7 @@ impl MetricEngineInner { schema: merged_schema, rows: merged_rows, }, - name_to_id, + physical_columns, ); self.row_modifier.modify_rows( @@ -406,8 +415,13 @@ impl MetricEngineInner { let (physical_region_id, data_region_id, primary_key_encoding) = self.find_data_region_meta(logical_region_id)?; - self.verify_rows(logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + logical_region_id, + physical_region_id, + &mut request.rows, + true, + ) + .await?; // write to data region // TODO: retrieve table name @@ -439,8 +453,13 @@ impl MetricEngineInner { let (physical_region_id, data_region_id, primary_key_encoding) = self.find_data_region_meta(logical_region_id)?; - self.verify_rows(logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + logical_region_id, + physical_region_id, + &mut request.rows, + false, + ) + .await?; // write to data region // TODO: retrieve table name @@ -484,12 +503,18 @@ impl MetricEngineInner { /// /// Includes: /// - Check if the logical region exists - /// - Check if the columns exist + /// - Check if every column in the request exists in the physical region + /// - Check each column's datatype and semantic type match the physical region's schema + /// - Check the time index column is present + /// - When `check_fields` is true, check every physical field column is present. + /// Set this to `false` for delete requests, which legitimately carry only + /// the primary key + timestamp. async fn verify_rows( &self, logical_region_id: RegionId, physical_region_id: RegionId, - rows: &Rows, + rows: &mut Rows, + check_fields: bool, ) -> Result<()> { // Check if the region exists let data_region_id = to_data_region_id(physical_region_id); @@ -502,22 +527,137 @@ impl MetricEngineInner { .fail(); } - // Check if a physical column exists - let physical_columns = state + // Type + semantic check on every column in the request schema. + let physical_state = state .physical_region_states() .get(&data_region_id) .context(PhysicalRegionNotFoundSnafu { region_id: data_region_id, - })? - .physical_columns(); + })?; + let physical_columns = physical_state.physical_columns(); for col in &rows.schema { - ensure!( - physical_columns.contains_key(&col.column_name), - ColumnNotFoundSnafu { - name: col.column_name.clone(), + let info = physical_columns + .get(&col.column_name) + .context(ColumnNotFoundSnafu { + name: &col.column_name, region_id: logical_region_id, + })?; + + ensure!( + api::helper::is_column_type_value_eq( + col.datatype, + col.datatype_extension.clone(), + &info.column_schema.data_type + ), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "column {} expect type {:?}, given: {}({})", + col.column_name, + info.column_schema.data_type, + api::v1::ColumnDataType::try_from(col.datatype) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), + col.datatype, + ), } ); + + ensure!( + api::helper::is_semantic_type_eq(col.semantic_type, info.semantic_type), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "column {} expect semantic type {:?}, given: {}({})", + col.column_name, + info.semantic_type, + api::v1::SemanticType::try_from(col.semantic_type) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), + col.semantic_type, + ), + } + ); + } + + let ts_name = physical_state.time_index_column_name(); + ensure!( + rows.schema.iter().any(|col| col.column_name == ts_name), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!("missing required time index column {ts_name}"), + } + ); + + if check_fields { + let field_name = physical_state.field_column_name(); + if !rows.schema.iter().any(|col| col.column_name == field_name) { + let field_meta = + physical_columns + .get(field_name) + .with_context(|| ColumnNotFoundSnafu { + name: field_name, + region_id: logical_region_id, + })?; + Self::fill_missing_field_column(logical_region_id, field_name, field_meta, rows)?; + } + } + + Ok(()) + } + + fn fill_missing_field_column( + logical_region_id: RegionId, + field_name: &str, + field_meta: &ColumnMetadata, + rows: &mut Rows, + ) -> Result<()> { + ensure!( + !field_meta.column_schema.is_default_impure(), + UnexpectedRequestSnafu { + reason: format!( + "unexpected impure default value with region_id: {logical_region_id}, column: {field_name}, default_value: {:?}", + field_meta.column_schema.default_constraint(), + ), + } + ); + + let default_value = field_meta + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id: logical_region_id, + column: field_name, + })? + .with_context(|| InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!("missing required field column {field_name}"), + })?; + let default_value = api::helper::to_grpc_value(default_value); + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(field_meta.column_schema.data_type.clone()) + .map_err(|e| { + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "no protobuf type for field column {field_name} ({:?}): {e}", + field_meta.column_schema.data_type + ), + } + .build() + })? + .to_parts(); + + rows.schema.push(ColumnSchema { + column_name: field_name.to_string(), + datatype: datatype as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension, + options: None, + }); + + for row in &mut rows.rows { + row.values.push(default_value.clone()); } Ok(()) @@ -536,14 +676,14 @@ impl MetricEngineInner { let input = std::mem::take(rows); let iter = { let state = self.state.read().unwrap(); - let name_to_id = state + let physical_columns = state .physical_region_states() .get(&physical_region_id) .with_context(|| PhysicalRegionNotFoundSnafu { region_id: physical_region_id, })? .physical_columns(); - RowsIter::new(input, name_to_id) + RowsIter::new(input, physical_columns) }; let output = self.row_modifier @@ -557,12 +697,17 @@ impl MetricEngineInner { mod tests { use std::collections::HashSet; + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_function::utils::partition_expr_version; use common_recordbatch::RecordBatches; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::value::Value as PartitionValue; use partition::expr::col; + use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, @@ -1258,4 +1403,271 @@ mod tests { .unwrap(); assert_eq!(response.affected_rows, 3); } + + /// Regression test for issue #7990: the metric engine must reject a row + /// whose timestamp column carries a non-timestamp datatype, rather than + /// letting it panic inside mito's `ValueBuilder::push`. + #[tokio::test] + async fn test_verify_rows_rejects_wrong_type() { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::{greptime_timestamp, greptime_value}; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Timestamp column is declared as String — the very payload that + // caused #7990. It should surface a typed error rather than panic. + let schema = vec![ + PbColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::StringValue("not-a-timestamp".to_string())), + }, + Value { + value_data: Some(ValueData::F64Value(1.0)), + }, + Value { + value_data: Some(ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + /// The completeness check must reject requests that omit the time index + /// column, since mito cannot default-fill a `TimeIndex` column and would + /// previously panic on the empty builder. + #[tokio::test] + async fn test_verify_rows_rejects_missing_time_index() { + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::greptime_value; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Payload only carries the field and a tag — no timestamp column. + let schema = vec![ + PbColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(api::v1::value::ValueData::F64Value(1.0)), + }, + Value { + value_data: Some(api::v1::value::ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[tokio::test] + async fn test_verify_rows_rejects_missing_field() { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::greptime_timestamp; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Schema has timestamp + tag but no field column. + let schema = vec![ + PbColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + let message = err.to_string(); + assert!( + message.contains("missing required field column"), + "expected field-completeness rejection, got: {message}" + ); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_fill_missing_field_column_nullable_no_default() { + let field_meta = ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value".to_string(), + ConcreteDataType::float64_datatype(), + true, // nullable, no default + ), + }; + let mut rows = Rows { + schema: vec![PbColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }], + rows: vec![Row { + values: vec![Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }], + }], + }; + + MetricEngineInner::fill_missing_field_column( + RegionId::new(1, 1), + "greptime_value", + &field_meta, + &mut rows, + ) + .unwrap(); + + assert_eq!(rows.schema.len(), 2); + assert_eq!(rows.schema[1].column_name, "greptime_value"); + assert_eq!(rows.rows[0].values.len(), 2); + assert!( + rows.rows[0].values[1].value_data.is_none(), + "missing nullable field should be filled with null" + ); + } + + #[test] + fn test_fill_missing_field_column_rejects_impure_default() { + let field_meta = ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_default_constraint(Some(ColumnDefaultConstraint::Function("now()".to_string()))) + .unwrap(), + }; + let mut rows = Rows { + schema: vec![PbColumnSchema { + column_name: "ts".to_string(), + datatype: api::v1::ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }], + rows: vec![Row { + values: vec![Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }], + }], + }; + + let err = MetricEngineInner::fill_missing_field_column( + RegionId::new(1, 1), + "greptime_value", + &field_meta, + &mut rows, + ) + .unwrap_err(); + assert!( + err.to_string().contains("impure default value"), + "expected impure-default rejection, got: {err}" + ); + } } diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 7c64758f48..25416380f1 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -16,11 +16,13 @@ use std::collections::{HashMap, HashSet}; +use api::v1::SemanticType; +use common_telemetry::warn; use common_time::timestamp::TimeUnit; use snafu::OptionExt; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::ColumnMetadata; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; use crate::engine::options::PhysicalRegionOptions; use crate::error::{PhysicalRegionNotFoundSnafu, Result}; @@ -29,7 +31,16 @@ use crate::utils::to_data_region_id; pub struct PhysicalRegionState { logical_regions: HashSet, - physical_columns: HashMap, + physical_columns: HashMap, + /// Name of the time index column, cached at region load so that the write + /// path doesn't have to scan `physical_columns` for the timestamp on every + /// row batch. The time index is fixed at region creation and never + /// changes, so this stays in sync with `physical_columns`. + time_index_column_name: String, + /// Name of the field column. Metric regions have exactly one field column + /// verified at creation time, so the write path can validate completeness + /// without consulting per-logical-region metadata. + field_column_name: String, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, @@ -37,14 +48,29 @@ pub struct PhysicalRegionState { impl PhysicalRegionState { pub fn new( - physical_columns: HashMap, + physical_columns: HashMap, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, ) -> Self { + // Safety: a valid physical region always has exactly one time index + // column; callers validate this before reaching here (see + // `create_data_region_request` and the open path). + let time_index_column_name = physical_columns + .iter() + .find(|(_, meta)| meta.semantic_type == SemanticType::Timestamp) + .map(|(name, _)| name.clone()) + .unwrap_or_default(); + let field_column_name = physical_columns + .iter() + .find(|(_, meta)| meta.semantic_type == SemanticType::Field) + .map(|(name, _)| name.clone()) + .unwrap_or_default(); Self { logical_regions: HashSet::new(), physical_columns, + time_index_column_name, + field_column_name, primary_key_encoding, options, time_index_unit, @@ -57,10 +83,20 @@ impl PhysicalRegionState { } /// Returns a reference to the physical columns. - pub fn physical_columns(&self) -> &HashMap { + pub fn physical_columns(&self) -> &HashMap { &self.physical_columns } + /// Returns the cached name of the time index column. + pub fn time_index_column_name(&self) -> &str { + &self.time_index_column_name + } + + /// Returns the cached name of the field column. + pub fn field_column_name(&self) -> &str { + &self.field_column_name + } + /// Returns a reference to the physical region options. pub fn options(&self) -> &PhysicalRegionOptions { &self.options @@ -90,7 +126,7 @@ impl MetricEngineState { pub fn add_physical_region( &mut self, physical_region_id: RegionId, - physical_columns: HashMap, + physical_columns: HashMap, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, @@ -112,12 +148,25 @@ impl MetricEngineState { pub fn add_physical_columns( &mut self, physical_region_id: RegionId, - physical_columns: impl IntoIterator, + physical_columns: impl IntoIterator, ) { let physical_region_id = to_data_region_id(physical_region_id); let state = self.physical_regions.get_mut(&physical_region_id).unwrap(); - for (col, id) in physical_columns { - state.physical_columns.insert(col, id); + for (col, meta) in physical_columns { + // The time index is fixed at region creation and alter cannot add + // a new one; keep the cached name in sync defensively. + debug_assert_ne!( + meta.semantic_type, + SemanticType::Timestamp, + "unexpected time index column {col} added to an existing physical region" + ); + if meta.semantic_type == SemanticType::Field { + warn!( + "Unexpected field column {col} added to physical region {physical_region_id}; cached field column remains {}", + state.field_column_name + ); + } + state.physical_columns.insert(col, meta); } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 6fb3406826..284b1b0298 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -343,6 +343,19 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to create default value for column {} of region {}", + column, + region_id + ))] + CreateDefault { + region_id: RegionId, + column: String, + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unexpected request: {}", reason))] UnexpectedRequest { reason: String, @@ -394,7 +407,8 @@ impl ErrorExt for Error { | UnsupportedAlterKind { .. } | UnsupportedRemapManifestsRequest { .. } | UnsupportedSyncRegionFromRequest { .. } - | InvalidRequest { .. } => StatusCode::InvalidArguments, + | InvalidRequest { .. } + | CreateDefault { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index d5ed1cc9b0..7d68f94aa0 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -23,6 +23,7 @@ use mito_codec::row_converter::SparsePrimaryKeyCodec; use smallvec::SmallVec; use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, }; @@ -264,7 +265,10 @@ struct IterIndex { } impl IterIndex { - fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap) -> Self { + fn new( + row_schema: &[ColumnSchema], + physical_columns: &HashMap, + ) -> Self { let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new(); // Uses BTreeMap to keep the primary key column name order (lexicographical) let mut primary_key_indices = BTreeMap::new(); @@ -290,7 +294,10 @@ impl IterIndex { primary_key_indices.insert( col.column_name.as_str(), ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns + .get(&col.column_name) + .unwrap() + .column_id, index: idx, }, ); @@ -298,13 +305,13 @@ impl IterIndex { }, SemanticType::Field => { field_indices.push(ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns.get(&col.column_name).unwrap().column_id, index: idx, }); } SemanticType::Timestamp => { ts_index = Some(ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns.get(&col.column_name).unwrap().column_id, index: idx, }); } @@ -338,8 +345,8 @@ pub struct RowsIter { } impl RowsIter { - pub fn new(rows: Rows, name_to_column_id: &HashMap) -> Self { - let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id); + pub fn new(rows: Rows, physical_columns: &HashMap) -> Self { + let index: IterIndex = IterIndex::new(&rows.schema, physical_columns); Self { rows, index } } @@ -455,8 +462,23 @@ mod tests { } } - fn test_name_to_column_id() -> HashMap { - HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)]) + fn make_info(name: &str, column_id: ColumnId) -> ColumnMetadata { + ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + name.to_string(), + datatypes::prelude::ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id, + } + } + + fn test_name_to_column_id() -> HashMap { + HashMap::from([ + ("namespace".to_string(), make_info("namespace", 1)), + ("host".to_string(), make_info("host", 2)), + ]) } #[test] @@ -657,11 +679,11 @@ mod tests { } /// Helper function to create a name_to_column_id map - fn create_name_to_column_id(labels: &[&str]) -> HashMap { + fn create_name_to_column_id(labels: &[&str]) -> HashMap { labels .iter() .enumerate() - .map(|(idx, name)| (name.to_string(), idx as ColumnId + 1)) + .map(|(idx, name)| (name.to_string(), make_info(name, idx as ColumnId + 1))) .collect() } @@ -692,7 +714,7 @@ mod tests { fn extract_tsid( schema: Vec, row: Row, - name_to_column_id: &HashMap, + name_to_column_id: &HashMap, table_id: TableId, ) -> u64 { let rows = Rows { diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index b1969bbaf7..5ba9fc0ed0 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -508,42 +508,42 @@ Affected Rows: 0 ALTER TABLE t1 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at3 STRING; + COLUMN at3 STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at2 STRING; + COLUMN at2 STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at4 UINT16; + COLUMN at4 STRING PRIMARY KEY; Affected Rows: 0 INSERT INTO t2 VALUES - ("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1); + ("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1); Affected Rows: 1 @@ -552,11 +552,11 @@ SELECT FROM t2; -+-------+-------+-------+-----+------+---------------------+-----+ -| at | at2 | at3 | at4 | job | ts | val | -+-------+-------+-------+-----+------+---------------------+-----+ -| loc_1 | loc_2 | loc_3 | 2 | job1 | 1970-01-01T00:00:00 | 1.0 | -+-------+-------+-------+-----+------+---------------------+-----+ ++-------+-------+-------+-------+------+---------------------+-----+ +| at | at2 | at3 | at4 | job | ts | val | ++-------+-------+-------+-------+------+---------------------+-----+ +| loc_1 | loc_2 | loc_3 | loc_4 | job1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------+-------+-------+------+---------------------+-----+ DROP TABLE t1; diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index 6088ea3bd2..767aca0510 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -184,32 +184,32 @@ CREATE TABLE t2 ( ALTER TABLE t1 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at3 STRING; + COLUMN at3 STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at2 STRING; + COLUMN at2 STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at4 UINT16; + COLUMN at4 STRING PRIMARY KEY; INSERT INTO t2 VALUES - ("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1); + ("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1); SELECT * diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index 80e765ccd6..32085175ea 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -77,6 +77,40 @@ DROP TABLE phy; Affected Rows: 0 +CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default"); + +Affected Rows: 0 + +INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1); + +Affected Rows: 2 + +SELECT host, ts, val FROM t_default ORDER BY host; + ++-------+-------------------------+------+ +| host | ts | val | ++-------+-------------------------+------+ +| host1 | 1970-01-01T00:00:00 | 42.0 | +| host2 | 1970-01-01T00:00:00.001 | 42.0 | ++-------+-------------------------+------+ + +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +INSERT INTO t_default (host, val) VALUES ('host3', 3); + +Error: 1004(InvalidArguments), Invalid request for region, reason: missing required time index column ts + +DROP TABLE t_default; + +Affected Rows: 0 + +DROP TABLE phy_default; + +Affected Rows: 0 + CREATE TABLE phy ( ts timestamp time index, val double diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index 9899699c66..6a3bed401d 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -24,6 +24,21 @@ SELECT ts, val, __tsid, host, job FROM phy; DROP TABLE phy; +CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = ""); + +CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default"); + +INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1); + +SELECT host, ts, val FROM t_default ORDER BY host; + +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +INSERT INTO t_default (host, val) VALUES ('host3', 3); + +DROP TABLE t_default; + +DROP TABLE phy_default; + CREATE TABLE phy ( ts timestamp time index, val double