diff --git a/src/metric-engine/src/batch_modifier.rs b/src/metric-engine/src/batch_modifier.rs index 76d9bb418a..2162215d1b 100644 --- a/src/metric-engine/src/batch_modifier.rs +++ b/src/metric-engine/src/batch_modifier.rs @@ -201,6 +201,7 @@ mod tests { use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datatypes::arrow::record_batch::RecordBatch; use store_api::codec::PrimaryKeyEncoding; + use store_api::metadata::ColumnMetadata; use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; use super::*; @@ -364,11 +365,23 @@ mod tests { let modified = modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); - let name_to_column_id: HashMap = [ - ("greptime_timestamp".to_string(), 0), - ("greptime_value".to_string(), 1), - ("namespace".to_string(), 2), - ("host".to_string(), 3), + let make_info = |name: &str, column_id: ColumnId| ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + name.to_string(), + datatypes::prelude::ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id, + }; + let name_to_column_id: HashMap = [ + ( + "greptime_timestamp".to_string(), + make_info("greptime_timestamp", 0), + ), + ("greptime_value".to_string(), make_info("greptime_value", 1)), + ("namespace".to_string(), make_info("namespace", 2)), + ("host".to_string(), make_info("host", 3)), ] .into_iter() .collect(); diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 9bc22e1102..b64d61a287 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::SemanticType; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, info}; use datatypes::schema::{SkippingIndexOptions, SkippingIndexType}; use mito2::engine::MitoEngine; use snafu::ResultExt; @@ -27,8 +27,8 @@ use store_api::storage::{ConcreteDataType, RegionId}; use crate::engine::IndexOptions; use crate::error::{ - ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, MitoReadOperationSnafu, - MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu, + AddingFieldColumnSnafu, ColumnTypeMismatchSnafu, ForbiddenPhysicalAlterSnafu, + MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, SetSkippingIndexOptionSnafu, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_DDL_DURATION, PHYSICAL_COLUMN_COUNT}; use crate::utils; @@ -132,10 +132,10 @@ impl DataRegion { .fail(); } } else { - warn!( - "Column {} in region {region_id} is not a tag", - c.column_schema.name - ); + return AddingFieldColumnSnafu { + name: &c.column_schema.name, + } + .fail(); }; c.column_id = new_column_id_start + delta as u32; diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 4b6b67f31b..892e0c91c2 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -189,7 +189,7 @@ impl MetricEngineInner { let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist let column_metadata = *physical_schema_map.get(name).unwrap(); - (name.to_string(), column_metadata.column_id) + (name.to_string(), column_metadata.clone()) }); // Writes logical regions metadata to metadata region diff --git a/src/metric-engine/src/engine/alter/extract_new_columns.rs b/src/metric-engine/src/engine/alter/extract_new_columns.rs index fdb1ef6126..c3bef1ba4f 100644 --- a/src/metric-engine/src/engine/alter/extract_new_columns.rs +++ b/src/metric-engine/src/engine/alter/extract_new_columns.rs @@ -14,11 +14,13 @@ use std::collections::{HashMap, HashSet}; +use api::v1::SemanticType; +use snafu::ensure; use store_api::metadata::ColumnMetadata; use store_api::region_request::{AlterKind, RegionAlterRequest}; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; -use crate::error::Result; +use crate::error::{AddingFieldColumnSnafu, Result}; /// Extract new columns from the create requests. /// @@ -27,7 +29,7 @@ use crate::error::Result; /// This function will panic if the alter kind is not `AddColumns`. pub fn extract_new_columns<'a>( requests: &'a [(RegionId, RegionAlterRequest)], - physical_columns: &HashMap, + physical_columns: &HashMap, new_column_names: &mut HashSet<&'a str>, new_columns: &mut Vec, ) -> Result<()> { @@ -40,6 +42,12 @@ pub fn extract_new_columns<'a>( if !physical_columns.contains_key(column_name) && !new_column_names.contains(column_name) { + ensure!( + col.column_metadata.semantic_type != SemanticType::Field, + AddingFieldColumnSnafu { + name: column_name.to_string(), + } + ); new_column_names.insert(column_name); // TODO(weny): avoid clone new_columns.push(col.column_metadata.clone()); @@ -49,3 +57,55 @@ pub fn extract_new_columns<'a>( Ok(()) } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use api::v1::SemanticType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::metadata::ColumnMetadata; + use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest}; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + + #[test] + fn test_extract_new_columns_with_field_type() { + let requests = vec![( + RegionId::new(1, 1), + RegionAlterRequest { + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "new_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 0, + }, + location: None, + }], + }, + }, + )]; + + let physical_columns = HashMap::new(); + let mut new_column_names = HashSet::new(); + let mut new_columns = Vec::new(); + + let err = extract_new_columns( + &requests, + &physical_columns, + &mut new_column_names, + &mut new_columns, + ) + .unwrap_err(); + + assert!(matches!(err, Error::AddingFieldColumn { .. })); + } +} diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs index 942dae1136..24c9e7934c 100644 --- a/src/metric-engine/src/engine/bulk_insert.rs +++ b/src/metric-engine/src/engine/bulk_insert.rs @@ -190,13 +190,13 @@ impl MetricEngineInner { for (index, field) in batch.schema().fields().iter().enumerate() { let name = field.name(); - let column_id = - *physical_columns - .get(name) - .with_context(|| error::ColumnNotFoundSnafu { - name: name.clone(), - region_id: logical_region_id, - })?; + let column_id = physical_columns + .get(name) + .map(|info| info.column_id) + .with_context(|| error::ColumnNotFoundSnafu { + name: name.clone(), + region_id: logical_region_id, + })?; if tag_names.contains(name.as_str()) { tag_columns.push(TagColumnInfo { name: name.clone(), diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 131ec1dbd4..34aeb5577c 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -145,7 +145,7 @@ impl MetricEngineInner { let physical_columns = create_data_region_request .column_metadatas .iter() - .map(|metadata| (metadata.column_schema.name.clone(), metadata.column_id)) + .map(|metadata| (metadata.column_schema.name.clone(), metadata.clone())) .collect::>(); let time_index_unit = create_data_region_request .column_metadatas @@ -321,7 +321,7 @@ impl MetricEngineInner { let new_add_columns = new_column_names.iter().map(|name| { // Safety: previous steps ensure the physical region exist let column_metadata = *physical_schema_map.get(name).unwrap(); - (name.to_string(), column_metadata.column_id) + (name.to_string(), column_metadata.clone()) }); extension_return_value.insert( diff --git a/src/metric-engine/src/engine/create/extract_new_columns.rs b/src/metric-engine/src/engine/create/extract_new_columns.rs index 9d1de9ebb2..82cc53ce3d 100644 --- a/src/metric-engine/src/engine/create/extract_new_columns.rs +++ b/src/metric-engine/src/engine/create/extract_new_columns.rs @@ -18,14 +18,14 @@ use api::v1::SemanticType; use snafu::ensure; use store_api::metadata::ColumnMetadata; use store_api::region_request::RegionCreateRequest; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; use crate::error::{AddingFieldColumnSnafu, Result}; /// Extract new columns from the create requests. pub fn extract_new_columns<'a>( requests: &'a [(RegionId, RegionCreateRequest)], - physical_columns: &HashMap, + physical_columns: &HashMap, new_column_names: &mut HashSet<&'a str>, new_columns: &mut Vec, ) -> Result<()> { @@ -123,7 +123,18 @@ mod tests { ]; let mut physical_columns = HashMap::new(); - physical_columns.insert("existing_column".to_string(), 0); + physical_columns.insert( + "existing_column".to_string(), + ColumnMetadata { + column_schema: ColumnSchema::new( + "existing_column".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }, + ); let mut new_column_names = HashSet::new(); let mut new_columns = Vec::new(); diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 70e3280183..59b1cfd928 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -326,7 +326,7 @@ impl MetricEngineInner { .unwrap(); let physical_columns = physical_columns .into_iter() - .map(|col| (col.column_schema.name, col.column_id)) + .map(|col| (col.column_schema.name.clone(), col)) .collect(); state.add_physical_region( physical_region_id, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index edae0d2bb4..07adfae120 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -14,13 +14,16 @@ use std::collections::HashMap; +use api::helper::ColumnDataTypeWrapper; use api::v1::{ - ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, Value, WriteHint, + ColumnSchema, PrimaryKeyEncoding as PrimaryKeyEncodingProto, Row, Rows, SemanticType, Value, + WriteHint, }; use common_telemetry::{error, info}; use fxhash::FxHashMap; -use snafu::{OptionExt, ensure}; +use snafu::{OptionExt, ResultExt, ensure}; use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::ColumnMetadata; use store_api::region_request::{ AffectedRows, RegionDeleteRequest, RegionPutRequest, RegionRequest, }; @@ -28,8 +31,9 @@ use store_api::storage::{RegionId, TableId}; use crate::engine::MetricEngineInner; use crate::error::{ - ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu, - LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnsupportedRegionRequestSnafu, + ColumnNotFoundSnafu, CreateDefaultSnafu, ForbiddenPhysicalAlterSnafu, InvalidRequestSnafu, + LogicalRegionNotFoundSnafu, PhysicalRegionNotFoundSnafu, Result, UnexpectedRequestSnafu, + UnsupportedRegionRequestSnafu, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED}; use crate::row_modifier::{RowsIter, TableIdInput}; @@ -116,7 +120,7 @@ impl MetricEngineInner { async fn put_regions_batch_single_physical( &self, physical_region_id: RegionId, - requests: Vec<(RegionId, RegionPutRequest)>, + mut requests: Vec<(RegionId, RegionPutRequest)>, ) -> Result { if requests.is_empty() { return Ok(0); @@ -126,7 +130,7 @@ impl MetricEngineInner { let primary_key_encoding = self.get_primary_key_encoding(data_region_id)?; // Validate all requests - self.validate_batch_requests(physical_region_id, &requests) + self.validate_batch_requests(physical_region_id, &mut requests) .await?; // Merge requests according to encoding strategy @@ -157,11 +161,16 @@ impl MetricEngineInner { async fn validate_batch_requests( &self, physical_region_id: RegionId, - requests: &[(RegionId, RegionPutRequest)], + requests: &mut [(RegionId, RegionPutRequest)], ) -> Result<()> { for (logical_region_id, request) in requests { - self.verify_rows(*logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + *logical_region_id, + physical_region_id, + &mut request.rows, + true, + ) + .await?; } Ok(()) } @@ -248,7 +257,7 @@ impl MetricEngineInner { // Batch-modify all rows (add __table_id and __tsid columns) let final_rows = { let state = self.state.read().unwrap(); - let name_to_id = state + let physical_columns = state .physical_region_states() .get(&data_region_id) .with_context(|| PhysicalRegionNotFoundSnafu { @@ -261,7 +270,7 @@ impl MetricEngineInner { schema: merged_schema, rows: merged_rows, }, - name_to_id, + physical_columns, ); self.row_modifier.modify_rows( @@ -406,8 +415,13 @@ impl MetricEngineInner { let (physical_region_id, data_region_id, primary_key_encoding) = self.find_data_region_meta(logical_region_id)?; - self.verify_rows(logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + logical_region_id, + physical_region_id, + &mut request.rows, + true, + ) + .await?; // write to data region // TODO: retrieve table name @@ -439,8 +453,13 @@ impl MetricEngineInner { let (physical_region_id, data_region_id, primary_key_encoding) = self.find_data_region_meta(logical_region_id)?; - self.verify_rows(logical_region_id, physical_region_id, &request.rows) - .await?; + self.verify_rows( + logical_region_id, + physical_region_id, + &mut request.rows, + false, + ) + .await?; // write to data region // TODO: retrieve table name @@ -484,12 +503,18 @@ impl MetricEngineInner { /// /// Includes: /// - Check if the logical region exists - /// - Check if the columns exist + /// - Check if every column in the request exists in the physical region + /// - Check each column's datatype and semantic type match the physical region's schema + /// - Check the time index column is present + /// - When `check_fields` is true, check every physical field column is present. + /// Set this to `false` for delete requests, which legitimately carry only + /// the primary key + timestamp. async fn verify_rows( &self, logical_region_id: RegionId, physical_region_id: RegionId, - rows: &Rows, + rows: &mut Rows, + check_fields: bool, ) -> Result<()> { // Check if the region exists let data_region_id = to_data_region_id(physical_region_id); @@ -502,22 +527,137 @@ impl MetricEngineInner { .fail(); } - // Check if a physical column exists - let physical_columns = state + // Type + semantic check on every column in the request schema. + let physical_state = state .physical_region_states() .get(&data_region_id) .context(PhysicalRegionNotFoundSnafu { region_id: data_region_id, - })? - .physical_columns(); + })?; + let physical_columns = physical_state.physical_columns(); for col in &rows.schema { - ensure!( - physical_columns.contains_key(&col.column_name), - ColumnNotFoundSnafu { - name: col.column_name.clone(), + let info = physical_columns + .get(&col.column_name) + .context(ColumnNotFoundSnafu { + name: &col.column_name, region_id: logical_region_id, + })?; + + ensure!( + api::helper::is_column_type_value_eq( + col.datatype, + col.datatype_extension.clone(), + &info.column_schema.data_type + ), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "column {} expect type {:?}, given: {}({})", + col.column_name, + info.column_schema.data_type, + api::v1::ColumnDataType::try_from(col.datatype) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), + col.datatype, + ), } ); + + ensure!( + api::helper::is_semantic_type_eq(col.semantic_type, info.semantic_type), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "column {} expect semantic type {:?}, given: {}({})", + col.column_name, + info.semantic_type, + api::v1::SemanticType::try_from(col.semantic_type) + .map(|v| v.as_str_name()) + .unwrap_or("Unknown"), + col.semantic_type, + ), + } + ); + } + + let ts_name = physical_state.time_index_column_name(); + ensure!( + rows.schema.iter().any(|col| col.column_name == ts_name), + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!("missing required time index column {ts_name}"), + } + ); + + if check_fields { + let field_name = physical_state.field_column_name(); + if !rows.schema.iter().any(|col| col.column_name == field_name) { + let field_meta = + physical_columns + .get(field_name) + .with_context(|| ColumnNotFoundSnafu { + name: field_name, + region_id: logical_region_id, + })?; + Self::fill_missing_field_column(logical_region_id, field_name, field_meta, rows)?; + } + } + + Ok(()) + } + + fn fill_missing_field_column( + logical_region_id: RegionId, + field_name: &str, + field_meta: &ColumnMetadata, + rows: &mut Rows, + ) -> Result<()> { + ensure!( + !field_meta.column_schema.is_default_impure(), + UnexpectedRequestSnafu { + reason: format!( + "unexpected impure default value with region_id: {logical_region_id}, column: {field_name}, default_value: {:?}", + field_meta.column_schema.default_constraint(), + ), + } + ); + + let default_value = field_meta + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id: logical_region_id, + column: field_name, + })? + .with_context(|| InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!("missing required field column {field_name}"), + })?; + let default_value = api::helper::to_grpc_value(default_value); + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(field_meta.column_schema.data_type.clone()) + .map_err(|e| { + InvalidRequestSnafu { + region_id: logical_region_id, + reason: format!( + "no protobuf type for field column {field_name} ({:?}): {e}", + field_meta.column_schema.data_type + ), + } + .build() + })? + .to_parts(); + + rows.schema.push(ColumnSchema { + column_name: field_name.to_string(), + datatype: datatype as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension, + options: None, + }); + + for row in &mut rows.rows { + row.values.push(default_value.clone()); } Ok(()) @@ -536,14 +676,14 @@ impl MetricEngineInner { let input = std::mem::take(rows); let iter = { let state = self.state.read().unwrap(); - let name_to_id = state + let physical_columns = state .physical_region_states() .get(&physical_region_id) .with_context(|| PhysicalRegionNotFoundSnafu { region_id: physical_region_id, })? .physical_columns(); - RowsIter::new(input, name_to_id) + RowsIter::new(input, physical_columns) }; let output = self.row_modifier @@ -557,12 +697,17 @@ impl MetricEngineInner { mod tests { use std::collections::HashSet; + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_function::utils::partition_expr_version; use common_recordbatch::RecordBatches; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::value::Value as PartitionValue; use partition::expr::col; + use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, @@ -1258,4 +1403,271 @@ mod tests { .unwrap(); assert_eq!(response.affected_rows, 3); } + + /// Regression test for issue #7990: the metric engine must reject a row + /// whose timestamp column carries a non-timestamp datatype, rather than + /// letting it panic inside mito's `ValueBuilder::push`. + #[tokio::test] + async fn test_verify_rows_rejects_wrong_type() { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::{greptime_timestamp, greptime_value}; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Timestamp column is declared as String — the very payload that + // caused #7990. It should surface a typed error rather than panic. + let schema = vec![ + PbColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::StringValue("not-a-timestamp".to_string())), + }, + Value { + value_data: Some(ValueData::F64Value(1.0)), + }, + Value { + value_data: Some(ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + /// The completeness check must reject requests that omit the time index + /// column, since mito cannot default-fill a `TimeIndex` column and would + /// previously panic on the empty builder. + #[tokio::test] + async fn test_verify_rows_rejects_missing_time_index() { + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::greptime_value; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Payload only carries the field and a tag — no timestamp column. + let schema = vec![ + PbColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(api::v1::value::ValueData::F64Value(1.0)), + }, + Value { + value_data: Some(api::v1::value::ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[tokio::test] + async fn test_verify_rows_rejects_missing_field() { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, SemanticType}; + use common_query::prelude::greptime_timestamp; + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let logical_region_id = env.default_logical_region_id(); + + // Schema has timestamp + tag but no field column. + let schema = vec![ + PbColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }, + PbColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as _, + datatype_extension: None, + options: None, + }, + ]; + let rows = vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }, + Value { + value_data: Some(ValueData::StringValue("tag_0".to_string())), + }, + ], + }]; + + let err = env + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap_err(); + let message = err.to_string(); + assert!( + message.contains("missing required field column"), + "expected field-completeness rejection, got: {message}" + ); + assert_eq!(err.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_fill_missing_field_column_nullable_no_default() { + let field_meta = ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value".to_string(), + ConcreteDataType::float64_datatype(), + true, // nullable, no default + ), + }; + let mut rows = Rows { + schema: vec![PbColumnSchema { + column_name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }], + rows: vec![Row { + values: vec![Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }], + }], + }; + + MetricEngineInner::fill_missing_field_column( + RegionId::new(1, 1), + "greptime_value", + &field_meta, + &mut rows, + ) + .unwrap(); + + assert_eq!(rows.schema.len(), 2); + assert_eq!(rows.schema[1].column_name, "greptime_value"); + assert_eq!(rows.rows[0].values.len(), 2); + assert!( + rows.rows[0].values[1].value_data.is_none(), + "missing nullable field should be filled with null" + ); + } + + #[test] + fn test_fill_missing_field_column_rejects_impure_default() { + let field_meta = ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "greptime_value".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_default_constraint(Some(ColumnDefaultConstraint::Function("now()".to_string()))) + .unwrap(), + }; + let mut rows = Rows { + schema: vec![PbColumnSchema { + column_name: "ts".to_string(), + datatype: api::v1::ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as _, + datatype_extension: None, + options: None, + }], + rows: vec![Row { + values: vec![Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)), + }], + }], + }; + + let err = MetricEngineInner::fill_missing_field_column( + RegionId::new(1, 1), + "greptime_value", + &field_meta, + &mut rows, + ) + .unwrap_err(); + assert!( + err.to_string().contains("impure default value"), + "expected impure-default rejection, got: {err}" + ); + } } diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 7c64758f48..25416380f1 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -16,11 +16,13 @@ use std::collections::{HashMap, HashSet}; +use api::v1::SemanticType; +use common_telemetry::warn; use common_time::timestamp::TimeUnit; use snafu::OptionExt; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::ColumnMetadata; -use store_api::storage::{ColumnId, RegionId}; +use store_api::storage::RegionId; use crate::engine::options::PhysicalRegionOptions; use crate::error::{PhysicalRegionNotFoundSnafu, Result}; @@ -29,7 +31,16 @@ use crate::utils::to_data_region_id; pub struct PhysicalRegionState { logical_regions: HashSet, - physical_columns: HashMap, + physical_columns: HashMap, + /// Name of the time index column, cached at region load so that the write + /// path doesn't have to scan `physical_columns` for the timestamp on every + /// row batch. The time index is fixed at region creation and never + /// changes, so this stays in sync with `physical_columns`. + time_index_column_name: String, + /// Name of the field column. Metric regions have exactly one field column + /// verified at creation time, so the write path can validate completeness + /// without consulting per-logical-region metadata. + field_column_name: String, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, @@ -37,14 +48,29 @@ pub struct PhysicalRegionState { impl PhysicalRegionState { pub fn new( - physical_columns: HashMap, + physical_columns: HashMap, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, ) -> Self { + // Safety: a valid physical region always has exactly one time index + // column; callers validate this before reaching here (see + // `create_data_region_request` and the open path). + let time_index_column_name = physical_columns + .iter() + .find(|(_, meta)| meta.semantic_type == SemanticType::Timestamp) + .map(|(name, _)| name.clone()) + .unwrap_or_default(); + let field_column_name = physical_columns + .iter() + .find(|(_, meta)| meta.semantic_type == SemanticType::Field) + .map(|(name, _)| name.clone()) + .unwrap_or_default(); Self { logical_regions: HashSet::new(), physical_columns, + time_index_column_name, + field_column_name, primary_key_encoding, options, time_index_unit, @@ -57,10 +83,20 @@ impl PhysicalRegionState { } /// Returns a reference to the physical columns. - pub fn physical_columns(&self) -> &HashMap { + pub fn physical_columns(&self) -> &HashMap { &self.physical_columns } + /// Returns the cached name of the time index column. + pub fn time_index_column_name(&self) -> &str { + &self.time_index_column_name + } + + /// Returns the cached name of the field column. + pub fn field_column_name(&self) -> &str { + &self.field_column_name + } + /// Returns a reference to the physical region options. pub fn options(&self) -> &PhysicalRegionOptions { &self.options @@ -90,7 +126,7 @@ impl MetricEngineState { pub fn add_physical_region( &mut self, physical_region_id: RegionId, - physical_columns: HashMap, + physical_columns: HashMap, primary_key_encoding: PrimaryKeyEncoding, options: PhysicalRegionOptions, time_index_unit: TimeUnit, @@ -112,12 +148,25 @@ impl MetricEngineState { pub fn add_physical_columns( &mut self, physical_region_id: RegionId, - physical_columns: impl IntoIterator, + physical_columns: impl IntoIterator, ) { let physical_region_id = to_data_region_id(physical_region_id); let state = self.physical_regions.get_mut(&physical_region_id).unwrap(); - for (col, id) in physical_columns { - state.physical_columns.insert(col, id); + for (col, meta) in physical_columns { + // The time index is fixed at region creation and alter cannot add + // a new one; keep the cached name in sync defensively. + debug_assert_ne!( + meta.semantic_type, + SemanticType::Timestamp, + "unexpected time index column {col} added to an existing physical region" + ); + if meta.semantic_type == SemanticType::Field { + warn!( + "Unexpected field column {col} added to physical region {physical_region_id}; cached field column remains {}", + state.field_column_name + ); + } + state.physical_columns.insert(col, meta); } } diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 6fb3406826..284b1b0298 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -343,6 +343,19 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to create default value for column {} of region {}", + column, + region_id + ))] + CreateDefault { + region_id: RegionId, + column: String, + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Unexpected request: {}", reason))] UnexpectedRequest { reason: String, @@ -394,7 +407,8 @@ impl ErrorExt for Error { | UnsupportedAlterKind { .. } | UnsupportedRemapManifestsRequest { .. } | UnsupportedSyncRegionFromRequest { .. } - | InvalidRequest { .. } => StatusCode::InvalidArguments, + | InvalidRequest { .. } + | CreateDefault { .. } => StatusCode::InvalidArguments, ForbiddenPhysicalAlter { .. } | UnsupportedRegionRequest { .. } diff --git a/src/metric-engine/src/row_modifier.rs b/src/metric-engine/src/row_modifier.rs index d5ed1cc9b0..7d68f94aa0 100644 --- a/src/metric-engine/src/row_modifier.rs +++ b/src/metric-engine/src/row_modifier.rs @@ -23,6 +23,7 @@ use mito_codec::row_converter::SparsePrimaryKeyCodec; use smallvec::SmallVec; use snafu::ResultExt; use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, }; @@ -264,7 +265,10 @@ struct IterIndex { } impl IterIndex { - fn new(row_schema: &[ColumnSchema], name_to_column_id: &HashMap) -> Self { + fn new( + row_schema: &[ColumnSchema], + physical_columns: &HashMap, + ) -> Self { let mut reserved_indices = SmallVec::<[ValueIndex; 2]>::new(); // Uses BTreeMap to keep the primary key column name order (lexicographical) let mut primary_key_indices = BTreeMap::new(); @@ -290,7 +294,10 @@ impl IterIndex { primary_key_indices.insert( col.column_name.as_str(), ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns + .get(&col.column_name) + .unwrap() + .column_id, index: idx, }, ); @@ -298,13 +305,13 @@ impl IterIndex { }, SemanticType::Field => { field_indices.push(ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns.get(&col.column_name).unwrap().column_id, index: idx, }); } SemanticType::Timestamp => { ts_index = Some(ValueIndex { - column_id: *name_to_column_id.get(&col.column_name).unwrap(), + column_id: physical_columns.get(&col.column_name).unwrap().column_id, index: idx, }); } @@ -338,8 +345,8 @@ pub struct RowsIter { } impl RowsIter { - pub fn new(rows: Rows, name_to_column_id: &HashMap) -> Self { - let index: IterIndex = IterIndex::new(&rows.schema, name_to_column_id); + pub fn new(rows: Rows, physical_columns: &HashMap) -> Self { + let index: IterIndex = IterIndex::new(&rows.schema, physical_columns); Self { rows, index } } @@ -455,8 +462,23 @@ mod tests { } } - fn test_name_to_column_id() -> HashMap { - HashMap::from([("namespace".to_string(), 1), ("host".to_string(), 2)]) + fn make_info(name: &str, column_id: ColumnId) -> ColumnMetadata { + ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema::new( + name.to_string(), + datatypes::prelude::ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id, + } + } + + fn test_name_to_column_id() -> HashMap { + HashMap::from([ + ("namespace".to_string(), make_info("namespace", 1)), + ("host".to_string(), make_info("host", 2)), + ]) } #[test] @@ -657,11 +679,11 @@ mod tests { } /// Helper function to create a name_to_column_id map - fn create_name_to_column_id(labels: &[&str]) -> HashMap { + fn create_name_to_column_id(labels: &[&str]) -> HashMap { labels .iter() .enumerate() - .map(|(idx, name)| (name.to_string(), idx as ColumnId + 1)) + .map(|(idx, name)| (name.to_string(), make_info(name, idx as ColumnId + 1))) .collect() } @@ -692,7 +714,7 @@ mod tests { fn extract_tsid( schema: Vec, row: Row, - name_to_column_id: &HashMap, + name_to_column_id: &HashMap, table_id: TableId, ) -> u64 { let rows = Rows { diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index b1969bbaf7..5ba9fc0ed0 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -508,42 +508,42 @@ Affected Rows: 0 ALTER TABLE t1 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at3 STRING; + COLUMN at3 STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at2 STRING; + COLUMN at2 STRING PRIMARY KEY; Affected Rows: 0 ALTER TABLE t2 ADD - COLUMN at4 UINT16; + COLUMN at4 STRING PRIMARY KEY; Affected Rows: 0 INSERT INTO t2 VALUES - ("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1); + ("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1); Affected Rows: 1 @@ -552,11 +552,11 @@ SELECT FROM t2; -+-------+-------+-------+-----+------+---------------------+-----+ -| at | at2 | at3 | at4 | job | ts | val | -+-------+-------+-------+-----+------+---------------------+-----+ -| loc_1 | loc_2 | loc_3 | 2 | job1 | 1970-01-01T00:00:00 | 1.0 | -+-------+-------+-------+-----+------+---------------------+-----+ ++-------+-------+-------+-------+------+---------------------+-----+ +| at | at2 | at3 | at4 | job | ts | val | ++-------+-------+-------+-------+------+---------------------+-----+ +| loc_1 | loc_2 | loc_3 | loc_4 | job1 | 1970-01-01T00:00:00 | 1.0 | ++-------+-------+-------+-------+------+---------------------+-----+ DROP TABLE t1; diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index 6088ea3bd2..767aca0510 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -184,32 +184,32 @@ CREATE TABLE t2 ( ALTER TABLE t1 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at3 STRING; + COLUMN at3 STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN `at` STRING; + COLUMN `at` STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at2 STRING; + COLUMN at2 STRING PRIMARY KEY; ALTER TABLE t2 ADD - COLUMN at4 UINT16; + COLUMN at4 STRING PRIMARY KEY; INSERT INTO t2 VALUES - ("loc_1", "loc_2", "loc_3", 2, 'job1', 0, 1); + ("loc_1", "loc_2", "loc_3", "loc_4", 'job1', 0, 1); SELECT * diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index 80e765ccd6..32085175ea 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -77,6 +77,40 @@ DROP TABLE phy; Affected Rows: 0 +CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = ""); + +Affected Rows: 0 + +CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default"); + +Affected Rows: 0 + +INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1); + +Affected Rows: 2 + +SELECT host, ts, val FROM t_default ORDER BY host; + ++-------+-------------------------+------+ +| host | ts | val | ++-------+-------------------------+------+ +| host1 | 1970-01-01T00:00:00 | 42.0 | +| host2 | 1970-01-01T00:00:00.001 | 42.0 | ++-------+-------------------------+------+ + +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +INSERT INTO t_default (host, val) VALUES ('host3', 3); + +Error: 1004(InvalidArguments), Invalid request for region, reason: missing required time index column ts + +DROP TABLE t_default; + +Affected Rows: 0 + +DROP TABLE phy_default; + +Affected Rows: 0 + CREATE TABLE phy ( ts timestamp time index, val double diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index 9899699c66..6a3bed401d 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -24,6 +24,21 @@ SELECT ts, val, __tsid, host, job FROM phy; DROP TABLE phy; +CREATE TABLE phy_default (ts timestamp time index, val double default 42) engine=metric with ("physical_metric_table" = ""); + +CREATE TABLE t_default (ts timestamp time index, val double default 42, host string primary key) engine = metric with ("on_physical_table" = "phy_default"); + +INSERT INTO t_default (host, ts) VALUES ('host1', 0), ('host2', 1); + +SELECT host, ts, val FROM t_default ORDER BY host; + +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +INSERT INTO t_default (host, val) VALUES ('host3', 3); + +DROP TABLE t_default; + +DROP TABLE phy_default; + CREATE TABLE phy ( ts timestamp time index, val double