diff --git a/src/common/meta/src/ddl/create_logical_tables/region_request.rs b/src/common/meta/src/ddl/create_logical_tables/region_request.rs index ea204078d3..4f1b0da7d8 100644 --- a/src/common/meta/src/ddl/create_logical_tables/region_request.rs +++ b/src/common/meta/src/ddl/create_logical_tables/region_request.rs @@ -102,6 +102,6 @@ pub fn create_region_request_builder_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, false)?; + let template = build_template_from_raw_table_info(raw_table_info)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/meta/src/ddl/create_table/template.rs b/src/common/meta/src/ddl/create_table/template.rs index 51cbe999f3..6cb5e66c31 100644 --- a/src/common/meta/src/ddl/create_table/template.rs +++ b/src/common/meta/src/ddl/create_table/template.rs @@ -20,55 +20,51 @@ use api::v1::region::{CreateRequest, RegionColumnDef}; use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::{ - LOGICAL_TABLE_METADATA_KEY, is_metric_engine_internal_column, -}; +use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::{RawTableInfo, TableId}; use crate::error::{self, Result}; +use crate::reconciliation::utils::build_column_metadata_from_table_info; use crate::wal_provider::prepare_wal_options; /// Constructs a [CreateRequest] based on the provided [RawTableInfo]. /// -/// Note: This function is primarily intended for creating logical tables or allocating placeholder regions. -pub fn build_template_from_raw_table_info( - raw_table_info: &RawTableInfo, - skip_internal_columns: bool, -) -> Result { +/// Note: This function is primarily intended for creating logical tables. +/// +/// Logical table templates keep the original column order and primary key indices from +/// `RawTableInfo` (including internal columns when present), because these are used to +/// reconstruct the logical schema on the engine side. +pub fn build_template_from_raw_table_info(raw_table_info: &RawTableInfo) -> Result { let primary_key_indices = &raw_table_info.meta.primary_key_indices; - let filtered = raw_table_info + let column_defs = raw_table_info .meta .schema .column_schemas .iter() .enumerate() - .filter(|(_, c)| !skip_internal_columns || !is_metric_engine_internal_column(&c.name)) .map(|(i, c)| { let is_primary_key = primary_key_indices.contains(&i); let column_def = try_as_column_def(c, is_primary_key) .context(error::ConvertColumnDefSnafu { column: &c.name })?; - Ok(( - is_primary_key.then_some(i), - RegionColumnDef { - column_def: Some(column_def), - // The column id will be overridden by the metric engine. - // So we just use the index as the column id. - column_id: i as u32, - }, - )) + Ok(RegionColumnDef { + column_def: Some(column_def), + // The column id will be overridden by the metric engine. + // So we just use the index as the column id. + column_id: i as u32, + }) }) - .collect::, RegionColumnDef)>>>()?; + .collect::>>()?; - let (new_primary_key_indices, column_defs): (Vec<_>, Vec<_>) = filtered.into_iter().unzip(); let options = HashMap::from(&raw_table_info.meta.options); let template = CreateRequest { region_id: 0, engine: raw_table_info.meta.engine.clone(), column_defs, - primary_key: new_primary_key_indices + primary_key: raw_table_info + .meta + .primary_key_indices .iter() - .flatten() .map(|i| *i as u32) .collect(), path: String::new(), @@ -79,6 +75,60 @@ pub fn build_template_from_raw_table_info( Ok(template) } +/// Constructs a [CreateRequest] based on the provided [RawTableInfo] for physical table. +/// +/// Note: This function is primarily intended for creating physical table. +/// +/// Physical table templates mark primary +/// keys by tag semantic type to match the physical storage layout. +pub fn build_template_from_raw_table_info_for_physical_table( + raw_table_info: &RawTableInfo, +) -> Result { + let name_to_ids = raw_table_info + .name_to_ids() + .context(error::MissingColumnIdsSnafu)?; + let column_metadatas = build_column_metadata_from_table_info( + &raw_table_info.meta.schema.column_schemas, + &raw_table_info.meta.primary_key_indices, + &name_to_ids, + )?; + let primary_key_ids = column_metadatas + .iter() + .filter(|c| c.semantic_type == SemanticType::Tag) + .map(|c| c.column_id) + .collect::>(); + let column_defs = column_metadatas + .iter() + .map(|c| { + let column_def = + try_as_column_def(&c.column_schema, c.semantic_type == SemanticType::Tag).context( + error::ConvertColumnDefSnafu { + column: &c.column_schema.name, + }, + )?; + let region_column_def = RegionColumnDef { + column_def: Some(column_def), + column_id: c.column_id, + }; + + Ok(region_column_def) + }) + .collect::>>()?; + + let options = HashMap::from(&raw_table_info.meta.options); + let template = CreateRequest { + region_id: 0, + engine: raw_table_info.meta.engine.clone(), + column_defs, + primary_key: primary_key_ids, + path: String::new(), + options, + partition: None, + }; + + Ok(template) +} + pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result { let column_defs = create_table_expr .column_defs diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index ca61abe12d..d88e402adf 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -467,7 +467,10 @@ pub fn extract_column_metadatas( ensure!( column_metadata == first_column_metadatas, MetadataCorruptionSnafu { - err_msg: "The table column metadata schemas from datanodes are not the same." + err_msg: format!( + "The table column metadata schemas from datanodes are not the same. First: {:?}, Current: {:?}", + first_column_metadatas, column_metadata, + ), } ); } diff --git a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs index 598fae4781..17abfe70d1 100644 --- a/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs +++ b/src/common/meta/src/reconciliation/reconcile_logical_tables/reconcile_regions.rs @@ -150,7 +150,7 @@ fn create_region_request_from_raw_table_info( raw_table_info: &RawTableInfo, physical_table_id: TableId, ) -> Result { - let template = build_template_from_raw_table_info(raw_table_info, false)?; + let template = build_template_from_raw_table_info(raw_table_info)?; Ok(CreateRequestBuilder::new(template, Some(physical_table_id))) } diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index a9dd663c2c..f6e0aeed93 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -71,6 +71,30 @@ impl RecordBatch { }) } + pub fn to_df_record_batch>( + arrow_schema: ArrowSchemaRef, + columns: I, + ) -> Result { + let columns: Vec<_> = columns.into_iter().collect(); + let arrow_arrays = columns.iter().map(|v| v.to_arrow_array()).collect(); + + // Casting the arrays here to match the schema, is a temporary solution to support Arrow's + // view array types (`StringViewArray` and `BinaryViewArray`). + // As to "support": the arrays here are created from vectors, which do not have types + // corresponding to view arrays. What we can do is to only cast them. + // As to "temporary": we are planing to use Arrow's RecordBatch directly in the read path. + // the casting here will be removed in the end. + // TODO(LFC): Remove the casting here once `Batch` is no longer used. + let arrow_arrays = Self::cast_view_arrays(&arrow_schema, arrow_arrays)?; + + let arrow_arrays = maybe_align_json_array_with_schema(&arrow_schema, arrow_arrays)?; + + let df_record_batch = DfRecordBatch::try_new(arrow_schema, arrow_arrays) + .context(error::NewDfRecordBatchSnafu)?; + + Ok(df_record_batch) + } + fn cast_view_arrays( schema: &ArrowSchemaRef, mut arrays: Vec, diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index aefb3d4b21..6f35c3feec 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -17,7 +17,7 @@ use std::collections::{HashMap, HashSet}; use common_meta::ddl::create_table::executor::CreateTableExecutor; use common_meta::ddl::create_table::template::{ - CreateRequestBuilder, build_template_from_raw_table_info, + CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table, }; use common_meta::lock_key::TableLock; use common_meta::node_manager::NodeManagerRef; @@ -268,8 +268,15 @@ impl AllocateRegion { &raw_table_info.name, ); let table_id = raw_table_info.ident.table_id; - let request = build_template_from_raw_table_info(raw_table_info, true) + // Repartition allocation targets physical regions, so exclude metric internal columns + // and derive primary keys from tag semantics. + let request = build_template_from_raw_table_info_for_physical_table(raw_table_info) .context(error::BuildCreateRequestSnafu { table_id })?; + common_telemetry::debug!( + "Allocating regions request, table_id: {}, request: {:?}", + table_id, + request + ); let builder = CreateRequestBuilder::new(request, None); let region_count = region_routes.len(); let wal_region_count = wal_options.len(); diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 925241ba08..131ec1dbd4 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -31,6 +31,7 @@ use store_api::metric_engine_consts::{ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, + is_metric_engine_internal_column, }; use store_api::mito_engine_options::{TTL_KEY, WAL_OPTIONS_KEY}; use store_api::region_engine::RegionEngine; @@ -361,15 +362,18 @@ impl MetricEngineInner { .map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx)) .collect::>(); - // check if internal columns are not occupied + let table_id_col_def = request.column_metadatas.iter().any(is_metric_name_col); + let tsid_col_def = request.column_metadatas.iter().any(is_tsid_col); + + // check if internal columns are not occupied or defined in the request ensure!( - !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME), + !name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME) || table_id_col_def, InternalColumnOccupiedSnafu { column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME, } ); ensure!( - !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME), + !name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME) || tsid_col_def, InternalColumnOccupiedSnafu { column: DATA_SCHEMA_TSID_COLUMN_NAME, } @@ -389,6 +393,10 @@ impl MetricEngineInner { // check if only one field column is declared, and all tag columns are string let mut field_col: Option<&ColumnMetadata> = None; for col in &request.column_metadatas { + // Verified in above steps. + if is_metric_engine_internal_column(&col.column_schema.name) { + continue; + } match col.semantic_type { SemanticType::Tag => ensure!( col.column_schema.data_type == ConcreteDataType::string_datatype(), @@ -508,21 +516,30 @@ impl MetricEngineInner { data_region_request.table_dir = request.table_dir.clone(); data_region_request.path_type = PathType::Data; + let table_id_col_def = request.column_metadatas.iter().any(is_metric_name_col); + let tsid_col_def = request.column_metadatas.iter().any(is_tsid_col); + // change nullability for tag columns data_region_request .column_metadatas .iter_mut() .for_each(|metadata| { - if metadata.semantic_type == SemanticType::Tag { + if metadata.semantic_type == SemanticType::Tag + && !is_metric_name_col(metadata) + && !is_tsid_col(metadata) + { metadata.column_schema.set_nullable(); primary_key.push(metadata.column_id); } }); - // add internal columns - let [table_id_col, tsid_col] = Self::internal_column_metadata(); - data_region_request.column_metadatas.push(table_id_col); - data_region_request.column_metadatas.push(tsid_col); + // add internal columns if not defined in the request + if !table_id_col_def { + data_region_request.column_metadatas.push(table_id_col()); + } + if !tsid_col_def { + data_region_request.column_metadatas.push(tsid_col()); + } data_region_request.primary_key = primary_key; // set data region options @@ -533,41 +550,57 @@ impl MetricEngineInner { data_region_request } +} - /// Generate internal column metadata. - /// - /// Return `[table_id_col, tsid_col]` - fn internal_column_metadata() -> [ColumnMetadata; 2] { - // Safety: BloomFilter is a valid skipping index type - let metric_name_col = ColumnMetadata { - column_id: ReservedColumnId::table_id(), - semantic_type: SemanticType::Tag, - column_schema: ColumnSchema::new( - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, - ConcreteDataType::uint32_datatype(), - false, - ) - .with_skipping_options(SkippingIndexOptions::new_unchecked( - DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY, - DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE, - datatypes::schema::SkippingIndexType::BloomFilter, - )) - .unwrap(), - }; - let tsid_col = ColumnMetadata { - column_id: ReservedColumnId::tsid(), - semantic_type: SemanticType::Tag, - column_schema: ColumnSchema::new( - DATA_SCHEMA_TSID_COLUMN_NAME, - ConcreteDataType::uint64_datatype(), - false, - ) - .with_inverted_index(false), - }; - [metric_name_col, tsid_col] +fn table_id_col() -> ColumnMetadata { + ColumnMetadata { + column_id: ReservedColumnId::table_id(), + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, + ConcreteDataType::uint32_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions::new_unchecked( + DEFAULT_TABLE_ID_SKIPPING_INDEX_GRANULARITY, + DEFAULT_TABLE_ID_SKIPPING_INDEX_FALSE_POSITIVE_RATE, + datatypes::schema::SkippingIndexType::BloomFilter, + )) + .unwrap(), } } +fn tsid_col() -> ColumnMetadata { + ColumnMetadata { + column_id: ReservedColumnId::tsid(), + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME, + ConcreteDataType::uint64_datatype(), + false, + ) + .with_inverted_index(false), + } +} + +/// Returns true if the column is the metric name column. +pub(crate) fn is_metric_name_col(column: &ColumnMetadata) -> bool { + column.column_id == ReservedColumnId::table_id() + && column.semantic_type == SemanticType::Tag + && column.column_schema.data_type == ConcreteDataType::uint32_datatype() + && column.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME + && !column.column_schema.is_nullable() +} + +/// Returns true if the column is the tsid column. +pub(crate) fn is_tsid_col(column: &ColumnMetadata) -> bool { + column.column_id == ReservedColumnId::tsid() + && column.semantic_type == SemanticType::Tag + && column.column_schema.data_type == ConcreteDataType::uint64_datatype() + && column.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME + && !column.column_schema.is_nullable() +} + /// Groups the create logical region requests by physical region id. fn group_create_logical_region_requests_by_physical_region_id( requests: Vec<(RegionId, RegionCreateRequest)>, @@ -628,6 +661,14 @@ mod test { use crate::engine::MetricEngine; use crate::test_util::{TestEnv, create_logical_region_request}; + #[test] + fn test_internal_column_metadata() { + let table_id_col = table_id_col(); + let tsid_col = tsid_col(); + assert!(is_metric_name_col(&table_id_col)); + assert!(is_tsid_col(&tsid_col)); + } + #[test] fn test_verify_region_create_request() { // internal column is occupied @@ -666,6 +707,50 @@ mod test { "Internal column __table_id is reserved".to_string() ); + // allow reserved internal columns when defined properly + let request = RegionCreateRequest { + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "column1".to_string(), + ConcreteDataType::string_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 2, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "column2".to_string(), + ConcreteDataType::float64_datatype(), + false, + ), + }, + table_id_col(), + tsid_col(), + ], + table_dir: "test_dir".to_string(), + path_type: PathType::Bare, + engine: METRIC_ENGINE_NAME.to_string(), + primary_key: vec![], + options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(), + partition_expr_json: Some("".to_string()), + }; + MetricEngineInner::verify_region_create_request(&request).unwrap(); + // valid request let request = RegionCreateRequest { column_metadatas: vec![ @@ -820,6 +905,100 @@ mod test { assert!(!metadata_region_request.options.contains_key("skip_wal")); } + #[tokio::test] + async fn test_create_request_for_physical_regions_with_internal_columns() { + let options: HashMap<_, _> = [ + ("ttl".to_string(), "60m".to_string()), + ("skip_wal".to_string(), "true".to_string()), + ] + .into_iter() + .collect(); + let request = RegionCreateRequest { + engine: METRIC_ENGINE_NAME.to_string(), + column_metadatas: vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Timestamp, + column_schema: ColumnSchema::new( + "timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 1, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag", + ConcreteDataType::string_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 2, + semantic_type: SemanticType::Field, + column_schema: ColumnSchema::new( + "value", + ConcreteDataType::float64_datatype(), + false, + ), + }, + table_id_col(), + tsid_col(), + ], + primary_key: vec![0], + options, + table_dir: "/test_dir".to_string(), + path_type: PathType::Bare, + partition_expr_json: Some("".to_string()), + }; + + let env = TestEnv::new().await; + let engine = MetricEngine::try_new(env.mito(), EngineConfig::default()).unwrap(); + let engine_inner = engine.inner; + + let data_region_request = engine_inner.create_request_for_data_region(&request); + assert_eq!(data_region_request.column_metadatas.len(), 5); + assert_eq!( + data_region_request.primary_key, + vec![ReservedColumnId::table_id(), ReservedColumnId::tsid(), 1] + ); + + let table_id_count = data_region_request + .column_metadatas + .iter() + .filter(|metadata| metadata.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .count(); + let tsid_count = data_region_request + .column_metadatas + .iter() + .filter(|metadata| metadata.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME) + .count(); + assert_eq!(table_id_count, 1); + assert_eq!(tsid_count, 1); + + let tag_metadata = data_region_request + .column_metadatas + .iter() + .find(|metadata| metadata.column_schema.name == "tag") + .unwrap(); + assert!(tag_metadata.column_schema.is_nullable()); + + let table_id_metadata = data_region_request + .column_metadatas + .iter() + .find(|metadata| metadata.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .unwrap(); + assert!(is_metric_name_col(table_id_metadata)); + + let tsid_metadata = data_region_request + .column_metadatas + .iter() + .find(|metadata| metadata.column_schema.name == DATA_SCHEMA_TSID_COLUMN_NAME) + .unwrap(); + assert!(is_tsid_col(tsid_metadata)); + } + #[tokio::test] async fn test_create_logical_regions() { let env = TestEnv::new().await; diff --git a/src/metric-engine/src/engine/staging.rs b/src/metric-engine/src/engine/staging.rs index 9db500957c..6cef0fad34 100644 --- a/src/metric-engine/src/engine/staging.rs +++ b/src/metric-engine/src/engine/staging.rs @@ -15,12 +15,11 @@ use common_base::AffectedRows; use snafu::ResultExt; use store_api::region_engine::RegionEngine; -use store_api::region_request::{EnterStagingRequest, RegionRequest}; +use store_api::region_request::RegionRequest; use store_api::storage::RegionId; use crate::engine::MetricEngine; use crate::error::{MitoEnterStagingOperationSnafu, Result}; -use crate::utils; impl MetricEngine { /// Handles the enter staging request for the given region. @@ -29,24 +28,11 @@ impl MetricEngine { region_id: RegionId, request: RegionRequest, ) -> Result { - let metadata_region_id = utils::to_metadata_region_id(region_id); - let data_region_id = utils::to_data_region_id(region_id); - - // For metadata region, it doesn't care about the partition expr, so we can just pass an empty string. + // We don't need to enter staging for metadata region. + // Callers should pass the data region id here; metadata regions stay unchanged. self.inner .mito - .handle_request( - metadata_region_id, - RegionRequest::EnterStaging(EnterStagingRequest { - partition_expr: String::new(), - }), - ) - .await - .context(MitoEnterStagingOperationSnafu)?; - - self.inner - .mito - .handle_request(data_region_id, request) + .handle_request(region_id, request) .await .context(MitoEnterStagingOperationSnafu) .map(|response| response.affected_rows) diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index f1c7d59013..70ffa827cf 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -97,6 +97,7 @@ impl BulkIterContext { codec, // we don't need to compat batch since all batch in memtable have the same schema. compat_batch: None, + compaction_projection_mapper: None, pre_filter_mode, partition_filter: None, }, diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 02819f2728..7394a3c4ab 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use api::v1::SemanticType; use common_error::ext::BoxedError; -use common_recordbatch::RecordBatch; use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::{DfRecordBatch, RecordBatch}; use datatypes::arrow::datatypes::Field; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; @@ -28,7 +28,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; -use crate::error::{InvalidRequestSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; use crate::read::projection::read_column_ids_from_projection; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; @@ -252,7 +252,15 @@ impl FlatProjectionMapper { if self.is_empty_projection { return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows()); } + let columns = self.project_vectors(batch)?; + RecordBatch::new(self.output_schema.clone(), columns) + } + /// Projects columns from the input batch and converts them into vectors. + pub(crate) fn project_vectors( + &self, + batch: &datatypes::arrow::record_batch::RecordBatch, + ) -> common_recordbatch::error::Result> { let mut columns = Vec::with_capacity(self.output_schema.num_columns()); for index in &self.batch_indices { let mut array = batch.column(*index).clone(); @@ -269,8 +277,7 @@ impl FlatProjectionMapper { .context(ExternalSnafu)?; columns.push(vector); } - - RecordBatch::new(self.output_schema.clone(), columns) + Ok(columns) } } @@ -341,3 +348,99 @@ fn compute_input_arrow_schema( Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields)) } + +/// Helper to project compaction batches into flat format columns +/// (fields + time index + __primary_key + __sequence + __op_type). +pub(crate) struct CompactionProjectionMapper { + mapper: FlatProjectionMapper, + assembler: DfBatchAssembler, +} + +impl CompactionProjectionMapper { + pub(crate) fn try_new(metadata: &RegionMetadataRef) -> Result { + let projection = metadata + .column_metadatas + .iter() + .enumerate() + .filter_map(|(idx, col)| { + if matches!(col.semantic_type, SemanticType::Field) { + Some(idx) + } else { + None + } + }) + .chain([metadata.time_index_column_pos()]) + .collect::>(); + + let mapper = FlatProjectionMapper::new_with_read_columns( + metadata, + projection, + metadata + .column_metadatas + .iter() + .map(|col| col.column_id) + .collect(), + )?; + let assembler = DfBatchAssembler::new(mapper.output_schema()); + + Ok(Self { mapper, assembler }) + } + + /// Projects columns and appends internal columns for compaction output. + /// + /// The input batch is expected to be in flat format with internal columns appended. + pub(crate) fn project(&self, batch: DfRecordBatch) -> Result { + let columns = self + .mapper + .project_vectors(&batch) + .context(RecordBatchSnafu)?; + self.assembler + .build_df_record_batch_with_internal(&batch, columns) + .context(RecordBatchSnafu) + } +} + +/// Builds [DfRecordBatch] with internal columns appended. +pub(crate) struct DfBatchAssembler { + output_arrow_schema_with_internal: datatypes::arrow::datatypes::SchemaRef, +} + +impl DfBatchAssembler { + /// Precomputes the output schema with internal columns. + pub(crate) fn new(output_schema: SchemaRef) -> Self { + let fields = output_schema + .arrow_schema() + .fields() + .into_iter() + .chain(internal_fields().iter()) + .cloned() + .collect::>(); + let output_arrow_schema_with_internal = + Arc::new(datatypes::arrow::datatypes::Schema::new(fields)); + Self { + output_arrow_schema_with_internal, + } + } + + /// Builds a [DfRecordBatch] from projected vectors plus internal columns. + /// + /// Assumes the input batch already contains internal columns as the last three fields + /// ("__primary_key", "__sequence", "__op_type"). + pub(crate) fn build_df_record_batch_with_internal( + &self, + batch: &datatypes::arrow::record_batch::RecordBatch, + mut columns: Vec, + ) -> common_recordbatch::error::Result { + let num_columns = batch.columns().len(); + // The last 3 columns are the internal columns. + let internal_indices = [num_columns - 3, num_columns - 2, num_columns - 1]; + for index in internal_indices.iter() { + let array = batch.column(*index).clone(); + let vector = Helper::try_into_vector(array) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + columns.push(vector); + } + RecordBatch::to_df_record_batch(self.output_arrow_schema_with_internal.clone(), columns) + } +} diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 0022ff7549..c25635a88b 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1443,7 +1443,16 @@ pub fn build_flat_file_range_scan_stream( }) }) .transpose()?; + + let mapper = range.compaction_projection_mapper(); while let Some(record_batch) = reader.next_batch()? { + let record_batch = if let Some(mapper) = mapper { + let batch = mapper.project(record_batch)?; + batch + } else { + record_batch + }; + if let Some(flat_compat) = may_compat { let batch = flat_compat.compat(record_batch)?; yield batch; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 79f00cdfed..d3b80fd86b 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -186,7 +186,7 @@ impl MitoRegion { } /// Returns current metadata of the region. - pub(crate) fn metadata(&self) -> RegionMetadataRef { + pub fn metadata(&self) -> RegionMetadataRef { let version_data = self.version_control.current(); version_data.version.metadata.clone() } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 52b706fa0e..fa5a03a9dc 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -44,6 +44,7 @@ use crate::error::{ }; use crate::read::Batch; use crate::read::compat::CompatBatch; +use crate::read::flat_projection::CompactionProjectionMapper; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::prune::{FlatPruneReader, PruneReader}; use crate::sst::file::FileHandle; @@ -269,6 +270,11 @@ impl FileRange { self.context.compat_batch() } + /// Returns the helper to project batches. + pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> { + self.context.compaction_projection_mapper() + } + /// Returns the file handle of the file range. pub(crate) fn file_handle(&self) -> &FileHandle { self.context.reader_builder.file_handle() @@ -324,6 +330,11 @@ impl FileRangeContext { self.base.compat_batch.as_ref() } + /// Returns the helper to project batches. + pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> { + self.base.compaction_projection_mapper.as_ref() + } + /// Sets the `CompatBatch` to the context. pub(crate) fn set_compat_batch(&mut self, compat: Option) { self.base.compat_batch = compat; @@ -406,6 +417,8 @@ pub(crate) struct RangeBase { pub(crate) codec: Arc, /// Optional helper to compat batches. pub(crate) compat_batch: Option, + /// Optional helper to project batches. + pub(crate) compaction_projection_mapper: Option, /// Mode to pre-filter columns. pub(crate) pre_filter_mode: PreFilterMode, /// Partition filter. @@ -628,10 +641,12 @@ impl RangeBase { continue; } - // Get the column directly by its projected index + // Get the column directly by its projected index. + // If the column is missing and it's not a tag/time column, this filter is skipped. + // Assumes the projection indices align with the input batch schema. let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id()); if let Some(idx) = column_idx { - let column = &input.columns()[idx]; + let column = &input.columns().get(idx).unwrap(); let result = filter.evaluate_array(column).context(RecordBatchSnafu)?; mask = mask.bitand(&result); } else if filter_ctx.semantic_type() == SemanticType::Tag { diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index f83752f037..d6b061e468 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -257,6 +257,9 @@ impl FlatReadFormat { } /// Gets the projection in the flat format. + /// + /// When `skip_auto_convert` is enabled (primary-key format read), this returns the + /// primary-key format projection so filter/prune can resolve projected indices. pub(crate) fn format_projection(&self) -> &FormatProjection { match &self.parquet_adapter { ParquetAdapter::Flat(p) => &p.format_projection, @@ -393,20 +396,29 @@ impl ParquetPrimaryKeyToFlat { let id_to_index = sst_column_id_indices(&metadata); let sst_column_num = flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); - // Computes the format projection for the new format. - let format_projection = FormatProjection::compute_format_projection( - &id_to_index, - sst_column_num, - column_ids.iter().copied(), - ); - let codec = build_primary_key_codec(&metadata); - let convert_format = if skip_auto_convert { - None - } else { - FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec) - }; + let codec = build_primary_key_codec(&metadata); let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied()); + let (convert_format, format_projection) = if skip_auto_convert { + ( + None, + FormatProjection { + projection_indices: format.projection_indices().to_vec(), + column_id_to_projected_index: format.field_id_to_projected_index().clone(), + }, + ) + } else { + // Computes the format projection for the new format. + let format_projection = FormatProjection::compute_format_projection( + &id_to_index, + sst_column_num, + column_ids.iter().copied(), + ); + ( + FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec), + format_projection, + ) + }; Self { format, diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 4d66292696..06461e9dbf 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -488,6 +488,11 @@ impl PrimaryKeyReadFormat { &self.projection_indices } + /// Gets the field id to projected index. + pub(crate) fn field_id_to_projected_index(&self) -> &HashMap { + &self.field_id_to_projected_index + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { self.override_sequence diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c221ddffc1..8f7c540463 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -36,7 +36,9 @@ use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels}; use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData}; +use partition::expr::PartitionExpr; use snafu::{OptionExt, ResultExt}; +use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_request::PathType; use store_api::storage::{ColumnId, FileId}; @@ -54,6 +56,7 @@ use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, READ_ROWS_TOTAL, READ_STAGE_ELAPSED, }; +use crate::read::flat_projection::CompactionProjectionMapper; use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; @@ -336,6 +339,37 @@ impl ParquetReaderBuilder { let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); // Gets the metadata stored in the SST. let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); + let region_partition_expr = self + .expected_metadata + .as_ref() + .and_then(|meta| meta.partition_expr.as_ref()); + let (_, is_same_region_partition) = Self::is_same_region_partition( + region_partition_expr.as_ref().map(|expr| expr.as_str()), + self.file_handle.meta_ref().partition_expr.as_ref(), + )?; + // Skip auto convert when: + // - compaction is enabled + // - region partition expr is same with file partition expr (no need to auto convert) + let skip_auto_convert = self.compaction && is_same_region_partition; + + // Build a compaction projection helper when: + // - compaction is enabled + // - region partition expr differs from file partition expr + // - flat format is enabled + // - primary key encoding is sparse + // + // This is applied after row-group filtering to align batches with flat output schema + // before compat handling. + let compaction_projection_mapper = if self.compaction + && !is_same_region_partition + && self.flat_format + && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse + { + Some(CompactionProjectionMapper::try_new(®ion_meta)?) + } else { + None + }; + let mut read_format = if let Some(column_ids) = &self.projection { ReadFormat::new( region_meta.clone(), @@ -343,7 +377,7 @@ impl ParquetReaderBuilder { self.flat_format, Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, - self.compaction, + skip_auto_convert, )? } else { // Lists all column ids to read, we always use the expected metadata if possible. @@ -359,7 +393,7 @@ impl ParquetReaderBuilder { self.flat_format, Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, - self.compaction, + skip_auto_convert, )? }; if self.decode_primary_key_values { @@ -454,6 +488,7 @@ impl ParquetReaderBuilder { prune_schema, codec, compat_batch: None, + compaction_projection_mapper, pre_filter_mode: self.pre_filter_mode, partition_filter, }, @@ -464,6 +499,19 @@ impl ParquetReaderBuilder { Ok((context, selection)) } + fn is_same_region_partition( + region_partition_expr_str: Option<&str>, + file_partition_expr: Option<&PartitionExpr>, + ) -> Result<(Option, bool)> { + let region_partition_expr = match region_partition_expr_str { + Some(expr_str) => crate::region::parse_partition_expr(Some(expr_str))?, + None => None, + }; + + let is_same = region_partition_expr.as_ref() == file_partition_expr; + Ok((region_partition_expr, is_same)) + } + /// Compare partition expressions from expected metadata and file metadata, /// and build a partition filter if they differ. fn build_partition_filter( @@ -477,19 +525,19 @@ impl ParquetReaderBuilder { .and_then(|meta| meta.partition_expr.as_ref()); let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref(); - let Some(region_str) = region_partition_expr_str else { - return Ok(None); - }; + let (region_partition_expr, is_same_region_partition) = Self::is_same_region_partition( + region_partition_expr_str.map(|s| s.as_str()), + file_partition_expr_ref, + )?; - let Some(region_partition_expr) = crate::region::parse_partition_expr(Some(region_str))? - else { - return Ok(None); - }; - - if Some(®ion_partition_expr) == file_partition_expr_ref { + if is_same_region_partition { return Ok(None); } + let Some(region_partition_expr) = region_partition_expr else { + return Ok(None); + }; + // Collect columns referenced by the partition expression. let mut referenced_columns = HashSet::new(); region_partition_expr.collect_column_names(&mut referenced_columns); diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 7bf31ff587..90207abe1e 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -82,6 +82,7 @@ sqlx = { workspace = true, features = [ "chrono", ] } standalone.workspace = true +store-api.workspace = true substrait.workspace = true table.workspace = true tempfile.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index be9b548f3f..daa2f64919 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -60,12 +60,14 @@ use meta_srv::gc::GcSchedulerOptions; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use mito2::gc::GcConfig; +use mito2::region::MitoRegionRef; use object_store::config::ObjectStoreConfig; use rand::Rng; use servers::grpc::GrpcOptions; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::server::ServerHandlers; +use store_api::storage::RegionId; use tempfile::TempDir; use tonic::codec::CompressionEncoding; use tonic::transport::Server; @@ -138,6 +140,20 @@ impl GreptimeDbCluster { sst_files } + + pub async fn list_all_regions(&self) -> HashMap { + let mut regions = HashMap::new(); + + for datanode in self.datanode_instances.values() { + let region_server = datanode.region_server(); + let mito = region_server.mito_engine().unwrap(); + for region in mito.regions() { + regions.insert(region.region_id(), region); + } + } + + regions + } } pub struct GreptimeDbClusterBuilder { diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index 4de29c7d87..b1dbee37ef 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -30,6 +30,8 @@ use meta_srv::metasrv::Metasrv; use mito2::gc::GcConfig; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; +use store_api::codec::PrimaryKeyEncoding; +use store_api::storage::RegionId; use tests_integration::cluster::GreptimeDbClusterBuilder; use tests_integration::test_util::{StorageType, get_test_store_config}; use tokio::sync::oneshot; @@ -45,7 +47,11 @@ macro_rules! repartition_tests { let store_type = tests_integration::test_util::StorageType::$service; if store_type.test_on() { common_telemetry::init_default_ut_logging(); - $crate::repartition::test_repartition_mito(store_type).await + // Cover both storage formats for repartition behavior. + // for flat format + $crate::repartition::test_repartition_mito(store_type, true).await; + // for primary key format + $crate::repartition::test_repartition_mito(store_type, false).await; } } @@ -53,8 +59,17 @@ macro_rules! repartition_tests { async fn [< test_repartition_metric >]() { let store_type = tests_integration::test_util::StorageType::$service; if store_type.test_on() { + use store_api::codec::PrimaryKeyEncoding; common_telemetry::init_default_ut_logging(); - $crate::repartition::test_repartition_metric(store_type).await + // Exercise format + primary key encoding matrix for metric engine. + // for flat format with sparse primary key encoding + $crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Sparse).await; + // for flat format with dense primary key encoding + $crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Dense).await; + // for primary key format with sparse primary key encoding + $crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Sparse).await; + // for primary key format with dense primary key encoding + $crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Dense).await; } } } @@ -114,7 +129,25 @@ async fn trigger_full_gc(ticker: &GcTickerRef) { let _ = rx.await.unwrap(); } -pub async fn test_repartition_mito(store_type: StorageType) { +fn query_partitions_sql(table_name: &str) -> String { + // We query information_schema.partitions to assert repartition results across engines, + // rather than relying on SHOW CREATE TABLE formatting differences. + format!( + "\ +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, \ +partition_description, greptime_partition_id, partition_ordinal_position \ +FROM information_schema.partitions \ +WHERE table_name = '{}' \ +ORDER BY partition_ordinal_position;", + table_name + ) +} + +pub async fn test_repartition_mito(store_type: StorageType, flat_format: bool) { + info!( + "test_repartition_mito: store_type: {:?}, flat_format: {:?}", + store_type, flat_format + ); let cluster_name = "test_repartition_mito"; let (store_config, _guard) = get_test_store_config(&store_type); let datanodes = 3u64; @@ -147,8 +180,9 @@ pub async fn test_repartition_mito(store_type: StorageType) { let query_ctx = QueryContext::arc(); let instance = cluster.fe_instance(); - // 1. Setup: Create a table with partitions - let sql = r#" + // 1. Setup: Create a table with partitions (format varies by test case) + let sql = if flat_format { + r#" CREATE TABLE `repartition_mito_table`( `id` INT, `city` STRING, @@ -158,8 +192,25 @@ pub async fn test_repartition_mito(store_type: StorageType) { `id` < 10, `id` >= 10 AND `id` < 20, `id` >= 20 - ); - "#; + ) ENGINE = mito + WITH ( + 'sst_format' = 'flat' + ); + "# + } else { + r#" + CREATE TABLE `repartition_mito_table`( + `id` INT, + `city` STRING, + `ts` TIMESTAMP TIME INDEX, + PRIMARY KEY(`id`, `city`) + ) PARTITION ON COLUMNS (`id`) ( + `id` < 10, + `id` >= 10 AND `id` < 20, + `id` >= 20 + ) ENGINE = mito; + "# + }; run_sql(instance, sql, query_ctx.clone()).await.unwrap(); let sql = r#" @@ -273,30 +324,19 @@ pub async fn test_repartition_mito(store_type: StorageType) { let result = run_sql( instance, - "SHOW CREATE TABLE `repartition_mito_table`", + &query_partitions_sql("repartition_mito_table"), query_ctx.clone(), ) .await .unwrap(); - let expected_create_table_after_split = r#"+------------------------+-------------------------------------------------------+ -| Table | Create Table | -+------------------------+-------------------------------------------------------+ -| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( | -| | "id" INT NULL, | -| | "city" STRING NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("id", "city") | -| | ) | -| | PARTITION ON COLUMNS ("id") ( | -| | id < 5, | -| | id >= 10 AND id < 20, | -| | id >= 20, | -| | id >= 5 AND id < 10 | -| | ) | -| | ENGINE=mito | -| | | -+------------------------+-------------------------------------------------------+"#; + let expected_create_table_after_split = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position | ++---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 | +| greptime | public | repartition_mito_table | p1 | id | id >= 10 AND id < 20 | 4398046511105 | 2 | +| greptime | public | repartition_mito_table | p2 | id | id >= 20 | 4398046511106 | 3 | +| greptime | public | repartition_mito_table | p3 | id | id >= 5 AND id < 10 | 4398046511107 | 4 | ++---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#; check_output_stream(result.data, expected_create_table_after_split).await; let sql = @@ -405,29 +445,18 @@ pub async fn test_repartition_mito(store_type: StorageType) { let result = run_sql( instance, - "SHOW CREATE TABLE `repartition_mito_table`", + &query_partitions_sql("repartition_mito_table"), query_ctx.clone(), ) .await .unwrap(); - let expected_create_table_after_merge = r#"+------------------------+-------------------------------------------------------+ -| Table | Create Table | -+------------------------+-------------------------------------------------------+ -| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( | -| | "id" INT NULL, | -| | "city" STRING NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("id", "city") | -| | ) | -| | PARTITION ON COLUMNS ("id") ( | -| | id < 5, | -| | id >= 10, | -| | id >= 5 AND id < 10 | -| | ) | -| | ENGINE=mito | -| | | -+------------------------+-------------------------------------------------------+"#; + let expected_create_table_after_merge = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position | ++---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 | +| greptime | public | repartition_mito_table | p1 | id | id >= 10 | 4398046511105 | 2 | +| greptime | public | repartition_mito_table | p2 | id | id >= 5 AND id < 10 | 4398046511107 | 3 | ++---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#; check_output_stream(result.data, expected_create_table_after_merge).await; let sql = @@ -463,7 +492,15 @@ pub async fn test_repartition_mito(store_type: StorageType) { .unwrap(); } -pub async fn test_repartition_metric(store_type: StorageType) { +pub async fn test_repartition_metric( + store_type: StorageType, + flat_format: bool, + primary_key_encoding: PrimaryKeyEncoding, +) { + info!( + "test_repartition_metric: store_type: {:?}, flat_format: {:?}, primary_key_encoding: {:?}", + store_type, flat_format, primary_key_encoding + ); let cluster_name = "test_repartition_metric"; let (store_config, _guard) = get_test_store_config(&store_type); let datanodes = 3u64; @@ -495,7 +532,15 @@ pub async fn test_repartition_metric(store_type: StorageType) { let query_ctx = QueryContext::arc(); let instance = cluster.fe_instance(); - let sql = r#" + // Explicitly configure sst_format and primary key encoding to cover the matrix. + let sst_format = if flat_format { "flat" } else { "primary_key" }; + let primary_key_encoding = match primary_key_encoding { + PrimaryKeyEncoding::Dense => "dense", + PrimaryKeyEncoding::Sparse => "sparse", + }; + + let sql = format!( + r#" CREATE TABLE `repart_phy_metric`( `ts` TIMESTAMP TIME INDEX, `val` DOUBLE, @@ -503,10 +548,20 @@ pub async fn test_repartition_metric(store_type: StorageType) { ) PARTITION ON COLUMNS (`host`) ( `host` < 'm', `host` >= 'm' - ) ENGINE = metric WITH ("physical_metric_table" = ""); - "#; - run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + ) ENGINE = metric + WITH ( + "physical_metric_table" = "", + "memtable.type" = "partition_tree", + 'sst_format' = '{sst_format}', + "memtable.partition_tree.primary_key_encoding" = "{primary_key_encoding}", + "index.type" = "inverted", + ); + "# + ); + run_sql(instance, &sql, query_ctx.clone()).await.unwrap(); + // A second logical table exercises repartition behavior across multiple logical tables + // sharing the same physical metric table. let sql = r#" CREATE TABLE `repart_log_metric`( `ts` TIMESTAMP TIME INDEX, @@ -516,6 +571,15 @@ pub async fn test_repartition_metric(store_type: StorageType) { "#; run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + let sql = r#" + CREATE TABLE `repart_log_metric_job`( + `ts` TIMESTAMP TIME INDEX, + `val` DOUBLE, + `job` STRING PRIMARY KEY + ) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric"); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + let sql = r#" INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('a_host', '2022-01-01 00:00:00', 1), @@ -552,32 +616,41 @@ pub async fn test_repartition_metric(store_type: StorageType) { let result = run_sql( instance, - "SHOW CREATE TABLE `repart_phy_metric`", + &query_partitions_sql("repart_phy_metric"), query_ctx.clone(), ) .await .unwrap(); - let expected_create_table_after_split = r#"+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "val" DOUBLE NULL, | -| | "host" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("host") | -| | ) | -| | PARTITION ON COLUMNS ("host") ( | -| | host < 'g', | -| | host >= 'm', | -| | host >= 'g' AND host < 'm' | -| | ) | -| | ENGINE=metric | -| | WITH( | -| | physical_metric_table = '' | -| | ) | -+-------------------+--------------------------------------------------+"#; + // Partition ids and order are expected to be stable within a single test run. + let expected_create_table_after_split = r#"+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position | ++---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+ +| greptime | public | repart_phy_metric | p0 | host | host < g | 4398046511104 | 1 | +| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 | +| greptime | public | repart_phy_metric | p2 | host | host >= g AND host < m | 4398046511106 | 3 | ++---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+"#; check_output_stream(result.data, expected_create_table_after_split).await; + let regions = cluster.list_all_regions().await; + let region0 = regions.get(&RegionId::new(1024, 0)).unwrap(); + let region2 = regions.get(&RegionId::new(1024, 2)).unwrap(); + let primary_keys_in_region_0 = region0 + .metadata() + .primary_key_columns() + .cloned() + .collect::>(); + let primary_keys_in_region_2 = region2 + .metadata() + .primary_key_columns() + .cloned() + .collect::>(); + info!("primary_keys_in_region_0: {:?}", primary_keys_in_region_0); + info!("primary_keys_in_region_2: {:?}", primary_keys_in_region_2); + assert_eq!(primary_keys_in_region_0, primary_keys_in_region_2); + + let sql = r#" + ALTER TABLE `repart_log_metric_job` ADD COLUMN `cpu` STRING PRIMARY KEY; +"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); let result = run_sql( instance, @@ -671,30 +744,17 @@ pub async fn test_repartition_metric(store_type: StorageType) { let result = run_sql( instance, - "SHOW CREATE TABLE `repart_phy_metric`", + &query_partitions_sql("repart_phy_metric"), query_ctx.clone(), ) .await .unwrap(); - let expected_create_table_after_merge = r#"+-------------------+--------------------------------------------------+ -| Table | Create Table | -+-------------------+--------------------------------------------------+ -| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | "val" DOUBLE NULL, | -| | "host" STRING NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("host") | -| | ) | -| | PARTITION ON COLUMNS ("host") ( | -| | host < 'm', | -| | host >= 'm' | -| | ) | -| | ENGINE=metric | -| | WITH( | -| | physical_metric_table = '' | -| | ) | -+-------------------+--------------------------------------------------+"#; + let expected_create_table_after_merge = r#"+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position | ++---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+ +| greptime | public | repart_phy_metric | p0 | host | host < m | 4398046511104 | 1 | +| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 | ++---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#; check_output_stream(result.data, expected_create_table_after_merge).await; let result = run_sql( @@ -780,7 +840,13 @@ pub async fn test_repartition_metric(store_type: StorageType) { | c_host | 2022-01-03T00:00:00 | 5.0 | +--------+---------------------+-----+"; check_output_stream(result.data, expected).await; - + run_sql( + instance, + "DROP TABLE `repart_log_metric_job`", + query_ctx.clone(), + ) + .await + .unwrap(); run_sql( instance, "DROP TABLE `repart_log_metric`",